From e5b8995954e6bf81b429a562677f43a303e65a49 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 28 Nov 2023 09:14:14 -0800 Subject: [PATCH] Decrypted message streaming (#139) * add decrypt streaming functions * add the stream all decrypted message function as well * remove unused imports * remove the bad package name * fix up the linter error --- .../xmtp/android/library/ConversationTest.kt | 24 +++++ .../org/xmtp/android/library/.editorconfig | 2 + .../org/xmtp/android/library/Conversation.kt | 18 +++- .../xmtp/android/library/ConversationV1.kt | 8 +- .../xmtp/android/library/ConversationV2.kt | 28 ++++-- .../org/xmtp/android/library/Conversations.kt | 87 +++++++++++++++++++ .../library/messages/DecryptedMessage.kt | 2 +- .../android/library/messages/MessageV2.kt | 1 - 8 files changed, 157 insertions(+), 13 deletions(-) create mode 100644 library/src/main/java/org/xmtp/android/library/.editorconfig diff --git a/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt b/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt index d669290bd..f6ff1ddea 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt @@ -448,6 +448,30 @@ class ConversationTest { assertTrue(isSteveOrBobConversation(messages[2].topic)) } + @Test + fun testListBatchDecryptedMessages() { + val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress) + val steveConversation = + aliceClient.conversations.newConversation(fixtures.steve.walletAddress) + + bobConversation.send(text = "hey alice 1") + bobConversation.send(text = "hey alice 2") + steveConversation.send(text = "hey alice 3") + val messages = aliceClient.conversations.listBatchDecryptedMessages( + listOf( + Pair(steveConversation.topic, null), + Pair(bobConversation.topic, null), + ), + ) + val isSteveOrBobConversation = { topic: String -> + (topic.equals(steveConversation.topic) || topic.equals(bobConversation.topic)) + } + assertEquals(3, messages.size) + assertTrue(isSteveOrBobConversation(messages[0].topic)) + assertTrue(isSteveOrBobConversation(messages[1].topic)) + assertTrue(isSteveOrBobConversation(messages[2].topic)) + } + @Test fun testListBatchMessagesWithPagination() { val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress) diff --git a/library/src/main/java/org/xmtp/android/library/.editorconfig b/library/src/main/java/org/xmtp/android/library/.editorconfig new file mode 100644 index 000000000..c31b521f4 --- /dev/null +++ b/library/src/main/java/org/xmtp/android/library/.editorconfig @@ -0,0 +1,2 @@ +[*.{kt,kts}] +disabled_rules = import-ordering \ No newline at end of file diff --git a/library/src/main/java/org/xmtp/android/library/Conversation.kt b/library/src/main/java/org/xmtp/android/library/Conversation.kt index 08acd50a9..af0324840 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversation.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversation.kt @@ -10,7 +10,7 @@ import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData import org.xmtp.proto.message.api.v1.MessageApiOuterClass import org.xmtp.proto.message.contents.Invitation import org.xmtp.proto.message.contents.Invitation.InvitationV1.Aes256gcmHkdfsha256 -import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage +import org.xmtp.android.library.messages.DecryptedMessage import java.util.Date sealed class Conversation { @@ -193,6 +193,15 @@ sealed class Conversation { } } + fun decrypt( + envelope: Envelope, + ): DecryptedMessage { + return when (this) { + is V1 -> conversationV1.decrypt(envelope) + is V2 -> conversationV2.decrypt(envelope) + } + } + val client: Client get() { return when (this) { @@ -208,6 +217,13 @@ sealed class Conversation { } } + fun streamDecryptedMessages(): Flow { + return when (this) { + is V1 -> conversationV1.streamDecryptedMessages() + is V2 -> conversationV2.streamDecryptedMessages() + } + } + fun streamEphemeral(): Flow { return when (this) { is V1 -> return conversationV1.streamEphemeral() diff --git a/library/src/main/java/org/xmtp/android/library/ConversationV1.kt b/library/src/main/java/org/xmtp/android/library/ConversationV1.kt index 23ae829a6..296f90bb3 100644 --- a/library/src/main/java/org/xmtp/android/library/ConversationV1.kt +++ b/library/src/main/java/org/xmtp/android/library/ConversationV1.kt @@ -22,7 +22,7 @@ import org.xmtp.android.library.messages.sentAt import org.xmtp.android.library.messages.toPublicKeyBundle import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.message.api.v1.MessageApiOuterClass -import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage +import org.xmtp.android.library.messages.DecryptedMessage import java.util.Date data class ConversationV1( @@ -233,4 +233,10 @@ data class ConversationV1( emit(it) } } + + fun streamDecryptedMessages(): Flow = flow { + client.subscribe(listOf(topic.description)).collect { + emit(decrypt(envelope = it)) + } + } } diff --git a/library/src/main/java/org/xmtp/android/library/ConversationV2.kt b/library/src/main/java/org/xmtp/android/library/ConversationV2.kt index ededf0410..4ae5bf1b5 100644 --- a/library/src/main/java/org/xmtp/android/library/ConversationV2.kt +++ b/library/src/main/java/org/xmtp/android/library/ConversationV2.kt @@ -21,7 +21,7 @@ import org.xmtp.android.library.messages.getPublicKeyBundle import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.message.api.v1.MessageApiOuterClass import org.xmtp.proto.message.contents.Invitation -import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage +import org.xmtp.android.library.messages.DecryptedMessage import java.util.Date data class ConversationV2( @@ -88,17 +88,21 @@ data class ConversationV2( val envelopes = runBlocking { client.apiClient.envelopes(topic, pagination) } return envelopes.map { envelope -> - val message = Message.parseFrom(envelope.message) - MessageV2Builder.buildDecrypt( - id = generateId(envelope = envelope), - topic, - message.v2, - keyMaterial, - client - ) + decrypt(envelope) } } + fun decrypt(envelope: Envelope): DecryptedMessage { + val message = Message.parseFrom(envelope.message) + return MessageV2Builder.buildDecrypt( + id = generateId(envelope = envelope), + topic, + message.v2, + keyMaterial, + client + ) + } + fun streamMessages(): Flow = flow { client.subscribe(listOf(topic)).mapNotNull { decodeEnvelopeOrNull(envelope = it) }.collect { emit(it) @@ -219,4 +223,10 @@ data class ConversationV2( emit(it) } } + + fun streamDecryptedMessages(): Flow = flow { + client.subscribe(listOf(topic)).collect { + emit(decrypt(envelope = it)) + } + } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index b3629dc15..2cef2630f 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -31,6 +31,7 @@ import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData import org.xmtp.proto.message.contents.Contact import org.xmtp.proto.message.contents.Invitation +import org.xmtp.android.library.messages.DecryptedMessage import java.util.Date data class Conversations( @@ -307,6 +308,37 @@ data class Conversations( return messages } + fun listBatchDecryptedMessages( + topics: List>, + ): List { + val requests = topics.map { (topic, page) -> + makeQueryRequest(topic = topic, pagination = page) + } + + // The maximum number of requests permitted in a single batch call. + val maxQueryRequestsPerBatch = 50 + val messages: MutableList = mutableListOf() + val batches = requests.chunked(maxQueryRequestsPerBatch) + for (batch in batches) { + runBlocking { + messages.addAll( + client.batchQuery(batch).responsesOrBuilderList.flatMap { res -> + res.envelopesList.mapNotNull { envelope -> + val conversation = conversationsByTopic[envelope.contentTopic] + if (conversation == null) { + Log.d(TAG, "discarding message, unknown conversation $envelope") + return@mapNotNull null + } + val msg = conversation.decrypt(envelope) + msg + } + } + ) + } + } + return messages + } + fun sendInvitation( recipient: SignedPublicKeyBundle, invitation: InvitationV1, @@ -423,4 +455,59 @@ data class Conversations( } } } + + fun streamAllDecryptedMessages(): Flow = flow { + val topics = mutableListOf( + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description + ) + + for (conversation in list()) { + topics.add(conversation.topic) + } + + val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics)) + + while (true) { + try { + client.subscribe2(request = subscribeFlow).collect { envelope -> + when { + conversationsByTopic.containsKey(envelope.contentTopic) -> { + val conversation = conversationsByTopic[envelope.contentTopic] + val decrypted = conversation?.decrypt(envelope) + decrypted?.let { emit(it) } + } + + envelope.contentTopic.startsWith("/xmtp/0/invite-") -> { + val conversation = fromInvite(envelope = envelope) + conversationsByTopic[conversation.topic] = conversation + topics.add(conversation.topic) + subscribeFlow.value = makeSubscribeRequest(topics) + } + + envelope.contentTopic.startsWith("/xmtp/0/intro-") -> { + val conversation = fromIntro(envelope = envelope) + conversationsByTopic[conversation.topic] = conversation + val decrypted = conversation.decrypt(envelope) + emit(decrypted) + topics.add(conversation.topic) + subscribeFlow.value = makeSubscribeRequest(topics) + } + + else -> {} + } + } + } catch (error: CancellationException) { + break + } catch (error: StatusException) { + if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) { + continue + } else { + break + } + } catch (error: Exception) { + continue + } + } + } } diff --git a/library/src/main/java/org/xmtp/android/library/messages/DecryptedMessage.kt b/library/src/main/java/org/xmtp/android/library/messages/DecryptedMessage.kt index c4a73fada..c2b2d4ef5 100644 --- a/library/src/main/java/org/xmtp/android/library/messages/DecryptedMessage.kt +++ b/library/src/main/java/org/xmtp/android/library/messages/DecryptedMessage.kt @@ -1,4 +1,4 @@ -package uniffi.xmtp_dh.org.xmtp.android.library.messages +package org.xmtp.android.library.messages import org.xmtp.android.library.codecs.EncodedContent import java.util.Date diff --git a/library/src/main/java/org/xmtp/android/library/messages/MessageV2.kt b/library/src/main/java/org/xmtp/android/library/messages/MessageV2.kt index 50185f05f..37b819ed7 100644 --- a/library/src/main/java/org/xmtp/android/library/messages/MessageV2.kt +++ b/library/src/main/java/org/xmtp/android/library/messages/MessageV2.kt @@ -11,7 +11,6 @@ import org.xmtp.android.library.DecodedMessage import org.xmtp.android.library.KeyUtil import org.xmtp.android.library.XMTPException import org.xmtp.android.library.codecs.EncodedContent -import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage import java.math.BigInteger import java.util.Date