Skip to content

Commit

Permalink
Decrypted message streaming (#139)
Browse files Browse the repository at this point in the history
* add decrypt streaming functions

* add the stream all decrypted message function as well

* remove unused imports

* remove the bad package name

* fix up the linter error
  • Loading branch information
nplasterer authored Nov 28, 2023
1 parent 65a3ed1 commit e5b8995
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,30 @@ class ConversationTest {
assertTrue(isSteveOrBobConversation(messages[2].topic))
}

@Test
fun testListBatchDecryptedMessages() {
val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress)
val steveConversation =
aliceClient.conversations.newConversation(fixtures.steve.walletAddress)

bobConversation.send(text = "hey alice 1")
bobConversation.send(text = "hey alice 2")
steveConversation.send(text = "hey alice 3")
val messages = aliceClient.conversations.listBatchDecryptedMessages(
listOf(
Pair(steveConversation.topic, null),
Pair(bobConversation.topic, null),
),
)
val isSteveOrBobConversation = { topic: String ->
(topic.equals(steveConversation.topic) || topic.equals(bobConversation.topic))
}
assertEquals(3, messages.size)
assertTrue(isSteveOrBobConversation(messages[0].topic))
assertTrue(isSteveOrBobConversation(messages[1].topic))
assertTrue(isSteveOrBobConversation(messages[2].topic))
}

@Test
fun testListBatchMessagesWithPagination() {
val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress)
Expand Down
2 changes: 2 additions & 0 deletions library/src/main/java/org/xmtp/android/library/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[*.{kt,kts}]
disabled_rules = import-ordering
18 changes: 17 additions & 1 deletion library/src/main/java/org/xmtp/android/library/Conversation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import org.xmtp.proto.message.contents.Invitation
import org.xmtp.proto.message.contents.Invitation.InvitationV1.Aes256gcmHkdfsha256
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.DecryptedMessage
import java.util.Date

sealed class Conversation {
Expand Down Expand Up @@ -193,6 +193,15 @@ sealed class Conversation {
}
}

fun decrypt(
envelope: Envelope,
): DecryptedMessage {
return when (this) {
is V1 -> conversationV1.decrypt(envelope)
is V2 -> conversationV2.decrypt(envelope)
}
}

val client: Client
get() {
return when (this) {
Expand All @@ -208,6 +217,13 @@ sealed class Conversation {
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> {
return when (this) {
is V1 -> conversationV1.streamDecryptedMessages()
is V2 -> conversationV2.streamDecryptedMessages()
}
}

fun streamEphemeral(): Flow<Envelope> {
return when (this) {
is V1 -> return conversationV1.streamEphemeral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.xmtp.android.library.messages.sentAt
import org.xmtp.android.library.messages.toPublicKeyBundle
import org.xmtp.android.library.messages.walletAddress
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.DecryptedMessage
import java.util.Date

data class ConversationV1(
Expand Down Expand Up @@ -233,4 +233,10 @@ data class ConversationV1(
emit(it)
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
client.subscribe(listOf(topic.description)).collect {
emit(decrypt(envelope = it))
}
}
}
28 changes: 19 additions & 9 deletions library/src/main/java/org/xmtp/android/library/ConversationV2.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.xmtp.android.library.messages.getPublicKeyBundle
import org.xmtp.android.library.messages.walletAddress
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import org.xmtp.proto.message.contents.Invitation
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.DecryptedMessage
import java.util.Date

data class ConversationV2(
Expand Down Expand Up @@ -88,17 +88,21 @@ data class ConversationV2(
val envelopes = runBlocking { client.apiClient.envelopes(topic, pagination) }

return envelopes.map { envelope ->
val message = Message.parseFrom(envelope.message)
MessageV2Builder.buildDecrypt(
id = generateId(envelope = envelope),
topic,
message.v2,
keyMaterial,
client
)
decrypt(envelope)
}
}

fun decrypt(envelope: Envelope): DecryptedMessage {
val message = Message.parseFrom(envelope.message)
return MessageV2Builder.buildDecrypt(
id = generateId(envelope = envelope),
topic,
message.v2,
keyMaterial,
client
)
}

fun streamMessages(): Flow<DecodedMessage> = flow {
client.subscribe(listOf(topic)).mapNotNull { decodeEnvelopeOrNull(envelope = it) }.collect {
emit(it)
Expand Down Expand Up @@ -219,4 +223,10 @@ data class ConversationV2(
emit(it)
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
client.subscribe(listOf(topic)).collect {
emit(decrypt(envelope = it))
}
}
}
87 changes: 87 additions & 0 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.xmtp.android.library.messages.walletAddress
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
import org.xmtp.proto.message.contents.Contact
import org.xmtp.proto.message.contents.Invitation
import org.xmtp.android.library.messages.DecryptedMessage
import java.util.Date

data class Conversations(
Expand Down Expand Up @@ -307,6 +308,37 @@ data class Conversations(
return messages
}

fun listBatchDecryptedMessages(
topics: List<Pair<String, Pagination?>>,
): List<DecryptedMessage> {
val requests = topics.map { (topic, page) ->
makeQueryRequest(topic = topic, pagination = page)
}

// The maximum number of requests permitted in a single batch call.
val maxQueryRequestsPerBatch = 50
val messages: MutableList<DecryptedMessage> = mutableListOf()
val batches = requests.chunked(maxQueryRequestsPerBatch)
for (batch in batches) {
runBlocking {
messages.addAll(
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
res.envelopesList.mapNotNull { envelope ->
val conversation = conversationsByTopic[envelope.contentTopic]
if (conversation == null) {
Log.d(TAG, "discarding message, unknown conversation $envelope")
return@mapNotNull null
}
val msg = conversation.decrypt(envelope)
msg
}
}
)
}
}
return messages
}

fun sendInvitation(
recipient: SignedPublicKeyBundle,
invitation: InvitationV1,
Expand Down Expand Up @@ -423,4 +455,59 @@ data class Conversations(
}
}
}

fun streamAllDecryptedMessages(): Flow<DecryptedMessage> = flow {
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description
)

for (conversation in list()) {
topics.add(conversation.topic)
}

val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))

while (true) {
try {
client.subscribe2(request = subscribeFlow).collect { envelope ->
when {
conversationsByTopic.containsKey(envelope.contentTopic) -> {
val conversation = conversationsByTopic[envelope.contentTopic]
val decrypted = conversation?.decrypt(envelope)
decrypted?.let { emit(it) }
}

envelope.contentTopic.startsWith("/xmtp/0/invite-") -> {
val conversation = fromInvite(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
topics.add(conversation.topic)
subscribeFlow.value = makeSubscribeRequest(topics)
}

envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
val conversation = fromIntro(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
val decrypted = conversation.decrypt(envelope)
emit(decrypted)
topics.add(conversation.topic)
subscribeFlow.value = makeSubscribeRequest(topics)
}

else -> {}
}
}
} catch (error: CancellationException) {
break
} catch (error: StatusException) {
if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) {
continue
} else {
break
}
} catch (error: Exception) {
continue
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uniffi.xmtp_dh.org.xmtp.android.library.messages
package org.xmtp.android.library.messages

import org.xmtp.android.library.codecs.EncodedContent
import java.util.Date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.xmtp.android.library.DecodedMessage
import org.xmtp.android.library.KeyUtil
import org.xmtp.android.library.XMTPException
import org.xmtp.android.library.codecs.EncodedContent
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
import java.math.BigInteger
import java.util.Date

Expand Down

0 comments on commit e5b8995

Please sign in to comment.