Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group Chat Streaming #166

Merged
merged 81 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
fcb9921
first pass at all the pieces needed for threading
nplasterer Jan 19, 2024
6326d7f
a few more places
nplasterer Jan 19, 2024
ed9ae8c
make signing key extend inboxOwner
nplasterer Jan 19, 2024
ff740e1
get it decoding messages
nplasterer Jan 19, 2024
750eb84
dump the latest v3 code
nplasterer Jan 23, 2024
15db4e8
write a test for creating a v3 client
nplasterer Jan 23, 2024
88a2c77
use created At
nplasterer Jan 23, 2024
53c5897
write test for creating libxmtp client and confirm it works
nplasterer Jan 23, 2024
8998d13
move these change to a different branch
nplasterer Jan 23, 2024
397982a
dont pass a conversation
nplasterer Jan 23, 2024
7bb1534
fix linter
nplasterer Jan 23, 2024
b35f686
point to local not dev
nplasterer Jan 23, 2024
aaecfc3
feature flag the client creating of libxmtp while in alpha
nplasterer Jan 23, 2024
9bbee43
change to local
nplasterer Jan 23, 2024
14b3870
fix up the test helper
nplasterer Jan 23, 2024
5ddf0f5
feat: fix up the example app
nplasterer Jan 23, 2024
305f483
fix up the 22 compat issue
nplasterer Jan 23, 2024
d0d8881
Revert "move these change to a different branch"
nplasterer Jan 23, 2024
296cac4
try and get some tests running
nplasterer Jan 24, 2024
2cb89b7
setup local database
nplasterer Jan 24, 2024
4892ffe
have it create correctly
nplasterer Jan 24, 2024
04abb1c
Merge branch 'np/group-spike' of https://github.com/xmtp/xmtp-android…
nplasterer Jan 24, 2024
9751afe
write tests for functionality
nplasterer Jan 24, 2024
4aedaa3
test sending
nplasterer Jan 24, 2024
bfd9e7b
send encoded content
nplasterer Jan 24, 2024
7915b0c
add updates to the v3 bindings
nplasterer Jan 24, 2024
ca663cf
add updates to the v3 bindings
nplasterer Jan 24, 2024
1e84e4c
store in a keystore
nplasterer Jan 25, 2024
4da7d38
move to preferences
nplasterer Jan 25, 2024
c05f7f8
fix lint
nplasterer Jan 25, 2024
dc42604
Merge branch 'np/group-spike' of https://github.com/xmtp/xmtp-android…
nplasterer Jan 25, 2024
4329033
Fix build issues
neekolas Jan 25, 2024
51cb81f
new libxmtp updates
nplasterer Jan 25, 2024
5e85337
Merge branch 'np/group-conversations' of https://github.com/xmtp/xmtp…
nplasterer Jan 25, 2024
ca7537f
dump the latest schema
nplasterer Jan 25, 2024
3fd9099
update to the latest client creation flow
nplasterer Jan 25, 2024
89d0e63
get the create working again
nplasterer Jan 26, 2024
845d360
use the keystore because its more secure
nplasterer Jan 26, 2024
8862d42
fix up linter compat again
nplasterer Jan 26, 2024
13b7cf9
flaky test
nplasterer Jan 26, 2024
ba3c247
Merge branch 'main' of https://github.com/xmtp/xmtp-android into np/g…
nplasterer Jan 26, 2024
88e3456
Merge branch 'np/group-spike' of https://github.com/xmtp/xmtp-android…
nplasterer Jan 26, 2024
d89f11c
get the tests all passing
nplasterer Jan 26, 2024
ac8a2d5
get the example working with groups
nplasterer Jan 26, 2024
4fc84ea
create a group with two addresses
nplasterer Jan 26, 2024
093c5e4
more tweaks to the example app to get groups working
nplasterer Jan 26, 2024
29c9e07
add streaming messages to groups
nplasterer Jan 26, 2024
e96d81a
a few example UI tweaks
nplasterer Jan 26, 2024
d83938d
fix the lowercasing issue in the example app
nplasterer Jan 26, 2024
6eaf597
dump the schema again
nplasterer Jan 26, 2024
5b676a5
Merge branch 'main' of https://github.com/xmtp/xmtp-android into np/g…
nplasterer Jan 27, 2024
b74f02f
implement all the conversation functionality
nplasterer Jan 30, 2024
33bdafa
add new codec for membership changes
nplasterer Jan 30, 2024
2c7ac43
write tests for it
nplasterer Jan 30, 2024
5c15d7b
fix up the tests a bit'
nplasterer Jan 30, 2024
816106f
add more tests and group streaming
nplasterer Jan 30, 2024
b579a65
get the new codec working as expected
nplasterer Jan 30, 2024
dec2192
add pagination to messages
nplasterer Jan 30, 2024
06e4df0
fix up the library linting issues
nplasterer Jan 30, 2024
6f5e068
fix up flaky test
nplasterer Jan 30, 2024
1cdd848
fix up min sdk version issue again
nplasterer Jan 30, 2024
58ce184
update the example app
nplasterer Jan 30, 2024
5099e29
remove the saved wallet stuff from the demo
nplasterer Jan 30, 2024
5191cc5
get groups working again with signer improvements and membership changes
nplasterer Jan 30, 2024
7641ac7
fix linter
nplasterer Jan 30, 2024
ee6b1cb
remove syncs so the client will need to manage
nplasterer Jan 30, 2024
655ad0d
add pagination to group listing
nplasterer Jan 30, 2024
eae74a3
dont return self for peers and add erroring to new group creation
nplasterer Jan 30, 2024
2f4abdc
update the syncing in the tests
nplasterer Jan 30, 2024
76bfcc7
remove all the streams work and move to another PR
nplasterer Jan 30, 2024
cbfdce1
Revert "remove all the streams work and move to another PR"
nplasterer Jan 30, 2024
a659c96
Merge branch 'main' of https://github.com/xmtp/xmtp-android into np/g…
nplasterer Feb 1, 2024
9b5d62d
update test
nplasterer Feb 1, 2024
fc86eed
undo all the bad merge items
nplasterer Feb 1, 2024
457331b
attempt at a few different stream techiniques
nplasterer Feb 2, 2024
cda4f13
get message streaming working and start on group streaming
nplasterer Feb 2, 2024
769c270
remove unneeded class
nplasterer Feb 2, 2024
dfdc218
get group streaming working
nplasterer Feb 2, 2024
d54521b
fix streaming for both
nplasterer Feb 2, 2024
be8a3d8
fix the linter
nplasterer Feb 2, 2024
28cd0c6
throw error if client doesnt support groups
nplasterer Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class MainViewModel : ViewModel() {
val stream: StateFlow<MainListItem?> =
stateFlow(viewModelScope, null) { subscriptionCount ->
if (ClientManager.clientState.value is ClientManager.ClientState.Ready) {
ClientManager.client.conversations.stream()
ClientManager.client.conversations.streamAll()
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import app.cash.turbine.test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertThrows
Expand Down Expand Up @@ -197,4 +199,56 @@ class GroupTest {
assertEquals(ReactionAction.Added, content?.action)
assertEquals(ReactionSchema.Unicode, content?.schema)
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))

group.streamMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().body)
group.send("hi again")
assertEquals("hi again", awaitItem().body)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))

group.streamDecryptedMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
group.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamGroups().test {
val group =
alixClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group.id.toHex(), awaitItem().topic)
val group2 =
caroClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group2.id.toHex(), awaitItem().topic)
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamAll().test {
val group =
caroClient.conversations.newGroup(listOf(bo.walletAddress))
assertEquals(group.id.toHex(), awaitItem().topic)
val conversation =
boClient.conversations.newConversation(alix.walletAddress)
assertEquals(conversation.topic, awaitItem().topic)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ sealed class Conversation {
return when (this) {
is V1 -> conversationV1.streamMessages()
is V2 -> conversationV2.streamMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamMessages()
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> {
return when (this) {
is V1 -> conversationV1.streamDecryptedMessages()
is V2 -> conversationV2.streamDecryptedMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamDecryptedMessages()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package org.xmtp.android.library
import android.util.Log
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
Expand All @@ -32,7 +35,9 @@ import org.xmtp.android.library.messages.walletAddress
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
import org.xmtp.proto.message.contents.Contact
import org.xmtp.proto.message.contents.Invitation
import uniffi.xmtpv3.FfiConversationCallback
import uniffi.xmtpv3.FfiConversations
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListConversationsOptions
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
Expand Down Expand Up @@ -477,7 +482,6 @@ data class Conversations(
client.subscribeTopic(
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
).collect { envelope ->

if (envelope.contentTopic == Topic.userIntro(client.address).description) {
val conversationV1 = fromIntro(envelope = envelope)
if (!streamedConversationTopics.contains(conversationV1.topic)) {
Expand All @@ -496,6 +500,21 @@ data class Conversations(
}
}

fun streamAll(): Flow<Conversation> {
return merge(streamGroups(), stream())
}

fun streamGroups(): Flow<Conversation> = callbackFlow {
val groupCallback = object : FfiConversationCallback {
override fun onConversation(conversation: FfiGroup) {
trySend(Conversation.Group(Group(client, conversation)))
}
}
val stream = libXMTPConversations?.stream(groupCallback)
?: throw XMTPException("Client does not support Groups")
awaitClose { stream.end() }
}

/**
* Get the stream of all messages of the current [Client]
* @return Flow object of [DecodedMessage] that represents all the messages of the
Expand Down
29 changes: 28 additions & 1 deletion library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.xmtp.android.library

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
Expand All @@ -10,6 +13,8 @@ import org.xmtp.android.library.messages.PagingInfoSortDirection
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListMessagesOptions
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
Expand All @@ -34,7 +39,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
runBlocking {
libXMTPGroup.send(contentBytes = encodedContent.toByteArray())
}
return id.toString()
return id.toHex()
}

fun <T> prepareMessage(content: T, options: SendOptions?): EncodedContent {
Expand Down Expand Up @@ -136,4 +141,26 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
libXMTPGroup.listMembers().map { it.accountAddress }
}
}

fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decode())
}
}

val stream = libXMTPGroup.stream(messageCallback)
awaitClose { stream.end() }
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(decrypt(Message(client, message)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to refer to what you're doing in here as decoding rather than decrypting. The messages all get decrypted in libxmtp.

Copy link
Contributor Author

@nplasterer nplasterer Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have two streaming methods one for decoding and one for decrypting. We need this stream decrypted messages for React Native custom content types. @nakajima could speak more to it.

Since Groups are conforming to the current Conversation model I have to use these methods. Once we do the refactor to Groups Dms and Conversation parent we might be able to get rid of some of these things.

But to break as little of the existing flow as possible. This is needed.

Copy link
Contributor

@richardhuaaa richardhuaaa Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intended difference between a DecryptedMessage and a DecodedMessage? When I trace the method calls from both streaming methods, they seem to produce the same result (they both return something with a field called encodedContent that is set to EncodedContent.parseFrom(libXMTPMessage.content))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is going to be any difference in the V3 world but again I didn't follow as closely to the reasoning for custom contentTypes. I found this for context xmtp/xmtp-ios#196

}
}

val stream = libXMTPGroup.stream(messageCallback)
awaitClose { stream.end() }
}
}
Loading