Skip to content

Commit

Permalink
Group Chat Streaming (#166)
Browse files Browse the repository at this point in the history
* first pass at all the pieces needed for threading

* a few more places

* make signing key extend inboxOwner

* get it decoding messages

* dump the latest v3 code

* write a test for creating a v3 client

* use created At

* write test for creating libxmtp client and confirm it works

* move these change to a different branch

* dont pass a conversation

* fix linter

* point to local not dev

* feature flag the client creating of libxmtp while in alpha

* change to local

* fix up the test helper

* feat: fix up the example app

* fix up the 22 compat issue

* Revert "move these change to a different branch"

This reverts commit 8998d13.

* try and get some tests running

* setup local database

* have it create correctly

* write tests for functionality

* test sending

* send encoded content

* add updates to the v3 bindings

* add updates to the v3 bindings

* store in a keystore

* move to preferences

* fix lint

* Fix build issues

* new libxmtp updates

* dump the latest schema

* update to the latest client creation flow

* get the create working again

* use the keystore because its more secure

* fix up linter compat again

* flaky test

* get the tests all passing

* get the example working with groups

* create a group with two addresses

* more tweaks to the example app to get groups working

* add streaming messages to groups

* a few example UI tweaks

* fix the lowercasing issue in the example app

* dump the schema again

* implement all the conversation functionality

* add new codec for membership changes

* write tests for it

* fix up the tests a bit'

* add more tests and group streaming

* get the new codec working as expected

* add pagination to messages

* fix up the library linting issues

* fix up flaky test

* fix up min sdk version issue again

* update the example app

* remove the saved wallet stuff from the demo

* get groups working again with signer improvements and membership changes

* fix linter

* remove syncs so the client will need to manage

* add pagination to group listing

* dont return self for peers and add erroring to new group creation

* update the syncing in the tests

* remove all the streams work and move to another PR

* Revert "remove all the streams work and move to another PR"

This reverts commit 76bfcc7.

* update test

* undo all the bad merge items

* attempt at a few different stream techiniques

* get message streaming working and start on group streaming

* remove unneeded class

* get group streaming working

* fix streaming for both

* fix the linter

* throw error if client doesnt support groups

---------

Co-authored-by: Nicholas Molnar <[email protected]>
  • Loading branch information
nplasterer and neekolas authored Feb 2, 2024
1 parent c6fe0ea commit e041fa8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class MainViewModel : ViewModel() {
val stream: StateFlow<MainListItem?> =
stateFlow(viewModelScope, null) { subscriptionCount ->
if (ClientManager.clientState.value is ClientManager.ClientState.Ready) {
ClientManager.client.conversations.stream()
ClientManager.client.conversations.streamAll()
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
Expand Down
54 changes: 54 additions & 0 deletions library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import app.cash.turbine.test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertThrows
Expand Down Expand Up @@ -197,4 +199,56 @@ class GroupTest {
assertEquals(ReactionAction.Added, content?.action)
assertEquals(ReactionSchema.Unicode, content?.schema)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))

group.streamMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().body)
group.send("hi again")
assertEquals("hi again", awaitItem().body)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))

group.streamDecryptedMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
group.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamGroups().test {
val group =
alixClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group.id.toHex(), awaitItem().topic)
val group2 =
caroClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group2.id.toHex(), awaitItem().topic)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamAll().test {
val group =
caroClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group.id.toHex(), awaitItem().topic)
val conversation =
boClient.conversations.newConversation(alix.walletAddress)
assertEquals(conversation.topic, awaitItem().topic)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ sealed class Conversation {
return when (this) {
is V1 -> conversationV1.streamMessages()
is V2 -> conversationV2.streamMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamMessages()
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> {
return when (this) {
is V1 -> conversationV1.streamDecryptedMessages()
is V2 -> conversationV2.streamDecryptedMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamDecryptedMessages()
}
}

Expand Down
21 changes: 20 additions & 1 deletion library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package org.xmtp.android.library
import android.util.Log
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
Expand All @@ -32,7 +35,9 @@ import org.xmtp.android.library.messages.walletAddress
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
import org.xmtp.proto.message.contents.Contact
import org.xmtp.proto.message.contents.Invitation
import uniffi.xmtpv3.FfiConversationCallback
import uniffi.xmtpv3.FfiConversations
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListConversationsOptions
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
Expand Down Expand Up @@ -477,7 +482,6 @@ data class Conversations(
client.subscribeTopic(
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
).collect { envelope ->

if (envelope.contentTopic == Topic.userIntro(client.address).description) {
val conversationV1 = fromIntro(envelope = envelope)
if (!streamedConversationTopics.contains(conversationV1.topic)) {
Expand All @@ -496,6 +500,21 @@ data class Conversations(
}
}

fun streamAll(): Flow<Conversation> {
return merge(streamGroups(), stream())
}

fun streamGroups(): Flow<Conversation> = callbackFlow {
val groupCallback = object : FfiConversationCallback {
override fun onConversation(conversation: FfiGroup) {
trySend(Conversation.Group(Group(client, conversation)))
}
}
val stream = libXMTPConversations?.stream(groupCallback)
?: throw XMTPException("Client does not support Groups")
awaitClose { stream.end() }
}

/**
* Get the stream of all messages of the current [Client]
* @return Flow object of [DecodedMessage] that represents all the messages of the
Expand Down
29 changes: 28 additions & 1 deletion library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.xmtp.android.library

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
Expand All @@ -10,6 +13,8 @@ import org.xmtp.android.library.messages.PagingInfoSortDirection
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListMessagesOptions
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
Expand All @@ -34,7 +39,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
runBlocking {
libXMTPGroup.send(contentBytes = encodedContent.toByteArray())
}
return id.toString()
return id.toHex()
}

fun <T> prepareMessage(content: T, options: SendOptions?): EncodedContent {
Expand Down Expand Up @@ -136,4 +141,26 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
libXMTPGroup.listMembers().map { it.accountAddress }
}
}

fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decode())
}
}

val stream = libXMTPGroup.stream(messageCallback)
awaitClose { stream.end() }
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(decrypt(Message(client, message)))
}
}

val stream = libXMTPGroup.stream(messageCallback)
awaitClose { stream.end() }
}
}

0 comments on commit e041fa8

Please sign in to comment.