From 3e6bf5d2f931ebd2c721e0574eaaf1efe739a3e5 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:07:34 -0700 Subject: [PATCH 1/6] fix: stream all messages --- .../xmtp/android/library/ConversationsTest.kt | 69 +++++++++++++++++++ .../org/xmtp/android/library/Conversations.kt | 35 ++++++++-- 2 files changed, 97 insertions(+), 7 deletions(-) diff --git a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt index 998fee944..8ffe2f9be 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt @@ -1,6 +1,12 @@ package org.xmtp.android.library +import android.util.Log import androidx.test.ext.junit.runners.AndroidJUnit4 +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.junit.Assert.assertEquals import org.junit.Test import org.junit.runner.RunWith @@ -16,6 +22,7 @@ import org.xmtp.android.library.messages.createDeterministic import org.xmtp.android.library.messages.getPublicKeyBundle import org.xmtp.android.library.messages.toPublicKeyBundle import org.xmtp.android.library.messages.walletAddress +import java.lang.Thread.sleep import java.util.Date @RunWith(AndroidJUnit4::class) @@ -71,4 +78,66 @@ class ConversationsTest { assertEquals(conversation.peerAddress, newWallet.address) assertEquals(conversation.createdAt.time, created.time) } + + @Test + fun testStreamAllMessages() = runBlocking { + val bo = PrivateKeyBuilder() + val alix = PrivateKeyBuilder() + val clientOptions = + ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false)) + val boClient = Client().create(bo, clientOptions) + val alixClient = Client().create(alix, clientOptions) + val boConversation = boClient.conversations.newConversation(alixClient.address) + + // Record message stream across all conversations + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + alixClient.conversations.streamAllMessages().collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + + } + } + sleep(5000) + + for (i in 0 until 5) { + boConversation.send(text = "Message $i") + sleep(1000) + } + assertEquals(allMessages.size, 5) + + val caro = PrivateKeyBuilder() + val caroClient = Client().create(caro, clientOptions) + val caroConversation = caroClient.conversations.newConversation(alixClient.address) + + for (i in 0 until 5) { + caroConversation.send(text = "Message $i") + sleep(1000) + } + + assertEquals(allMessages.size, 10) + + job.cancel() + + CoroutineScope(Dispatchers.IO).launch { + try { + alixClient.conversations.streamAllMessages().collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + } + } + sleep(5000) + + for (i in 0 until 5) { + boConversation.send(text = "Message $i") + sleep(1000) + } + + assertEquals(allMessages.size, 15) + } + } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 3ef969b96..df570c5ca 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -1,8 +1,24 @@ package org.xmtp.android.library import android.util.Log +import com.google.common.collect.Collections2.transform +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.cancellable +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.coroutineContext +import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.flow.transform +import kotlinx.coroutines.job +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest import org.xmtp.android.library.messages.Envelope @@ -25,6 +41,7 @@ import org.xmtp.android.library.messages.sentAt import org.xmtp.android.library.messages.toSignedPublicKeyBundle import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData +import org.xmtp.proto.message.api.v1.envelope import org.xmtp.proto.message.contents.Contact import org.xmtp.proto.message.contents.Invitation import java.util.Date @@ -374,25 +391,29 @@ data class Conversations( } envelope.contentTopic.startsWith("/xmtp/0/invite-") -> { - val conversation = fromInvite(envelope) + val conversation = fromInvite(envelope = envelope) conversationsByTopic[conversation.topic] = conversation - // Break so we can resubscribe with the new conversation - return@collect + topics.add(conversation.topic) + currentCoroutineContext().job.cancel() } envelope.contentTopic.startsWith("/xmtp/0/intro-") -> { - val conversation = fromIntro(envelope) + val conversation = fromIntro(envelope = envelope) conversationsByTopic[conversation.topic] = conversation val decoded = conversation.decode(envelope) emit(decoded) - // Break so we can resubscribe with the new conversation - return@collect + topics.add(conversation.topic) + currentCoroutineContext().job.cancel() + } + + else -> { } } } } catch (error: Exception) { - throw error + continue } + delay(500) } } } From b5163648a345725dcc374c9414e8eafaad94d614 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:11:42 -0700 Subject: [PATCH 2/6] fix up the lint issues --- .../xmtp/android/library/ConversationsTest.kt | 13 +++++-------- .../org/xmtp/android/library/Conversations.kt | 19 +------------------ 2 files changed, 6 insertions(+), 26 deletions(-) diff --git a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt index 8ffe2f9be..ee77e358f 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt @@ -1,12 +1,10 @@ package org.xmtp.android.library -import android.util.Log import androidx.test.ext.junit.runners.AndroidJUnit4 import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import org.junit.Assert.assertEquals import org.junit.Test import org.junit.runner.RunWith @@ -97,11 +95,9 @@ class ConversationsTest { alixClient.conversations.streamAllMessages().collect { message -> allMessages.add(message) } - } catch (e: Exception) { - - } + } catch (e: Exception) {} } - sleep(5000) + sleep(2500) for (i in 0 until 5) { boConversation.send(text = "Message $i") @@ -113,6 +109,8 @@ class ConversationsTest { val caroClient = Client().create(caro, clientOptions) val caroConversation = caroClient.conversations.newConversation(alixClient.address) + sleep(2500) + for (i in 0 until 5) { caroConversation.send(text = "Message $i") sleep(1000) @@ -130,7 +128,7 @@ class ConversationsTest { } catch (e: Exception) { } } - sleep(5000) + sleep(2500) for (i in 0 until 5) { boConversation.send(text = "Message $i") @@ -139,5 +137,4 @@ class ConversationsTest { assertEquals(allMessages.size, 15) } - } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index df570c5ca..83a291e05 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -1,24 +1,10 @@ package org.xmtp.android.library import android.util.Log -import com.google.common.collect.Collections2.transform -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.cancellable -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.coroutineContext -import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapLatest -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.flow.transform import kotlinx.coroutines.job -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest import org.xmtp.android.library.messages.Envelope @@ -41,7 +27,6 @@ import org.xmtp.android.library.messages.sentAt import org.xmtp.android.library.messages.toSignedPublicKeyBundle import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData -import org.xmtp.proto.message.api.v1.envelope import org.xmtp.proto.message.contents.Contact import org.xmtp.proto.message.contents.Invitation import java.util.Date @@ -406,14 +391,12 @@ data class Conversations( currentCoroutineContext().job.cancel() } - else -> { - } + else -> {} } } } catch (error: Exception) { continue } - delay(500) } } } From 80b33bcfd370bcf6bbd389cb9ae0dbd1e5301443 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:17:04 -0700 Subject: [PATCH 3/6] replaced with the new test --- .../org/xmtp/android/library/ConversationTest.kt | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt b/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt index deb667c8c..1e0e3fe6f 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt @@ -557,18 +557,6 @@ class ConversationTest { } } - @Test - fun testStreamAllMessagesGetsMessageFromKnownConversation() = kotlinx.coroutines.test.runTest { - val fixtures = fixtures() - val client = fixtures.aliceClient - val bobConversation = fixtures.bobClient.conversations.newConversation(client.address) - client.conversations.streamAllMessages().test { - bobConversation.send(text = "hi") - assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) - awaitComplete() - } - } - @Test fun testV2RejectsSpoofedContactBundles() { val topic = "/xmtp/0/m-Gdb7oj5nNdfZ3MJFLAcS4WTABgr6al1hePy6JV1-QUE/proto" From bf249ef8973a69b4764f40435610fa134b91ebbb Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:20:16 -0700 Subject: [PATCH 4/6] move it out of the while for performance --- dev/local/docker-compose.yml | 10 +++++++--- .../org/xmtp/android/library/Conversations.kt | 17 ++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dev/local/docker-compose.yml b/dev/local/docker-compose.yml index 50999b773..e9563186b 100644 --- a/dev/local/docker-compose.yml +++ b/dev/local/docker-compose.yml @@ -5,9 +5,13 @@ services: environment: - GOWAKU-NODEKEY=8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67 command: - - --store.enable - - --store.db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - - --store.reader-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable + - --ws + - --store + - --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable + - --message-db-reader-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable + - --lightpush + - --filter + - --ws-port=9001 - --wait-for-db=30s - --api.authn.enable ports: diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 83a291e05..84fbb88d8 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -356,16 +356,15 @@ data class Conversations( } fun streamAllMessages(): Flow = flow { - while (true) { - val topics = mutableListOf( - Topic.userInvite(client.address).description, - Topic.userIntro(client.address).description - ) - - for (conversation in list()) { - topics.add(conversation.topic) - } + val topics = mutableListOf( + Topic.userInvite(client.address).description, + Topic.userIntro(client.address).description + ) + for (conversation in list()) { + topics.add(conversation.topic) + } + while (true) { try { client.subscribe(topics = topics).collect { envelope -> when { From e72d782140417d44877ab66c822076b79486cda8 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:20:55 -0700 Subject: [PATCH 5/6] Update docker-compose.yml --- dev/local/docker-compose.yml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/dev/local/docker-compose.yml b/dev/local/docker-compose.yml index e9563186b..197ba1d63 100644 --- a/dev/local/docker-compose.yml +++ b/dev/local/docker-compose.yml @@ -5,13 +5,9 @@ services: environment: - GOWAKU-NODEKEY=8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67 command: - - --ws - - --store - - --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - - --message-db-reader-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - - --lightpush - - --filter - - --ws-port=9001 + - --store.enable + - --store.db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable + - --store.reader-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - --wait-for-db=30s - --api.authn.enable ports: @@ -34,4 +30,4 @@ services: depends_on: wakunode: condition: service_healthy - build: ./test \ No newline at end of file + build: ./test From 2f9089a6449d6dca935ce581a01063bca7a180e0 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 29 Sep 2023 17:26:36 -0700 Subject: [PATCH 6/6] work around no longer needed --- .github/workflows/test.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index af90bcdf3..352f9428a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,12 +44,6 @@ jobs: uses: gradle/gradle-build-action@v2 - name: Validate Gradle Wrapper uses: gradle/wrapper-validation-action@v1 - # Workaround for https://github.com/actions/runner-images/issues/8104 - - name: Fix Qemu Error - run: | - brew remove --ignore-dependencies qemu - curl -o ./qemu.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/f88e30b3a23ef3735580f9b05535ce5a0a03c9e3/Formula/qemu.rb - brew install ./qemu.rb - name: Set up Docker run: brew install docker docker-compose - name: Start Colima