Skip to content

Commit

Permalink
feat: streaming and threading improvements (#191)
Browse files Browse the repository at this point in the history
* bump the binaries

* make all send functions suspend

* fix up the tests to not lock on send

* Revert "make all send functions suspend"

This reverts commit e4152d8.

* Revert "Revert "make all send functions suspend""

This reverts commit 125d1ac.

* fix lots of the threading issues

* fix up the linter errors

* bump to the latest rust library

* make delay longer
  • Loading branch information
nplasterer authored Mar 14, 2024
1 parent 0e1e0bc commit 0134852
Show file tree
Hide file tree
Showing 23 changed files with 708 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -27,10 +28,12 @@ class AttachmentTest {
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)

aliceConversation.send(
content = attachment,
options = SendOptions(contentType = ContentTypeAttachment),
)
runBlocking {
aliceConversation.send(
content = attachment,
options = SendOptions(contentType = ContentTypeAttachment),
)
}
val messages = aliceConversation.messages()
assertEquals(messages.size, 1)
if (messages.size == 1) {
Expand Down
41 changes: 25 additions & 16 deletions library/src/androidTest/java/org/xmtp/android/library/CodecTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
Expand Down Expand Up @@ -60,10 +61,12 @@ class CodecTest {
val aliceClient = fixtures.aliceClient
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = NumberCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = NumberCodec().contentType),
)
}
val messages = aliceConversation.messages()
assertEquals(messages.size, 1)
if (messages.size == 1) {
Expand All @@ -82,10 +85,12 @@ class CodecTest {
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
val textContent = TextCodec().encode(content = "hiya")
val source = DecodedComposite(encodedContent = textContent)
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val decoded: DecodedComposite? = messages[0].content()
assertEquals("hiya", decoded?.content())
Expand All @@ -107,10 +112,12 @@ class CodecTest {
DecodedComposite(parts = listOf(DecodedComposite(encodedContent = numberContent))),
),
)
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val decoded: DecodedComposite? = messages[0].content()
val part1 = decoded!!.parts[0]
Expand All @@ -127,10 +134,12 @@ class CodecTest {
val aliceClient = fixtures.aliceClient!!
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = codec.contentType),
)
runBlocking {
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = codec.contentType),
)
}
val messages = aliceConversation.messages()
assert(messages.isNotEmpty())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import app.cash.turbine.test
import com.google.protobuf.kotlin.toByteString
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
Expand Down Expand Up @@ -108,28 +109,30 @@ class ConversationTest {
// Overwrite contact as legacy
bobClient.publishUserContact(legacy = true)
aliceClient.publishUserContact(legacy = true)
bobClient.publish(
envelopes = listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(bob.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(alice.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.directMessageV1(
bob.walletAddress,
alice.walletAddress,
runBlocking {
bobClient.publish(
envelopes = listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(bob.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(alice.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.directMessageV1(
bob.walletAddress,
alice.walletAddress,
),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
),
)
)
}
var conversation = aliceClient.conversations.newConversation(bob.walletAddress)
assertEquals(conversation.peerAddress, bob.walletAddress)
assertEquals(conversation.createdAt, someTimeAgo)
Expand Down Expand Up @@ -173,8 +176,8 @@ class ConversationTest {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)

bobConversation.send(content = "hey alice")
bobConversation.send(content = "hey alice again")
runBlocking { bobConversation.send(content = "hey alice") }
runBlocking { bobConversation.send(content = "hey alice again") }
val messages = aliceConversation.messages()
assertEquals(2, messages.size)
assertEquals("hey alice", messages[1].body)
Expand All @@ -192,7 +195,7 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation("hi"),
)
bobConversation.send(content = "hey alice")
runBlocking { bobConversation.send(content = "hey alice") }
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals("hey alice", messages[0].body)
Expand Down Expand Up @@ -248,9 +251,10 @@ class ConversationTest {
val tamperedEnvelope = EnvelopeBuilder.buildFromString(
topic = aliceConversation.topic,
timestamp = Date(),
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2).toByteArray(),
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2)
.toByteArray(),
)
aliceClient.publish(envelopes = listOf(tamperedEnvelope))
runBlocking { aliceClient.publish(envelopes = listOf(tamperedEnvelope)) }
val bobConversation = bobClient.conversations.newConversation(
aliceWallet.address,
InvitationV1ContextBuilder.buildFromConversation("hi"),
Expand All @@ -268,10 +272,12 @@ class ConversationTest {
fixtures.publishLegacyContact(client = aliceClient)
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
runBlocking {
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
Expand All @@ -283,10 +289,12 @@ class ConversationTest {
fixtures.publishLegacyContact(client = aliceClient)
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
runBlocking {
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
Expand All @@ -302,10 +310,12 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
)
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
runBlocking {
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
Expand All @@ -322,10 +332,12 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
)
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
runBlocking {
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
Expand Down Expand Up @@ -369,7 +381,7 @@ class ConversationTest {
ConversationV2.create(client = client, invitation = invitationv1, header = header)
assertEquals(fakeContactWallet.address, conversation.peerAddress)

conversation.send(content = "hello world")
runBlocking { conversation.send(content = "hello world") }

val conversationList = client.conversations.list()
val recipientConversation = conversationList.lastOrNull()
Expand Down Expand Up @@ -401,9 +413,9 @@ class ConversationTest {

val date = Date()
date.time = date.time - 1000000
bobConversation.send(text = "hey alice 1", sentAt = date)
bobConversation.send(text = "hey alice 2")
bobConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { bobConversation.send(text = "hey alice 3") }
val messages = aliceConversation.messages(limit = 1)
assertEquals(1, messages.size)
assertEquals("hey alice 3", messages[0].body)
Expand All @@ -422,9 +434,9 @@ class ConversationTest {
)
val date = Date()
date.time = date.time - 1000000
bobConversation.send(text = "hey alice 1", sentAt = date)
bobConversation.send(text = "hey alice 2")
bobConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { bobConversation.send(text = "hey alice 3") }
val messages = aliceConversation.messages(limit = 1)
assertEquals(1, messages.size)
assertEquals("hey alice 3", messages[0].body)
Expand All @@ -445,9 +457,9 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1")
bobConversation.send(text = "hey alice 2")
steveConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1") }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { steveConversation.send(text = "hey alice 3") }
val messages = aliceClient.conversations.listBatchMessages(
listOf(
Pair(steveConversation.topic, null),
Expand All @@ -469,9 +481,9 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1")
bobConversation.send(text = "hey alice 2")
steveConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1") }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { steveConversation.send(text = "hey alice 3") }
val messages = aliceClient.conversations.listBatchDecryptedMessages(
listOf(
Pair(steveConversation.topic, null),
Expand All @@ -493,16 +505,16 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1 bob")
steveConversation.send(text = "hey alice 1 steve")
runBlocking { bobConversation.send(text = "hey alice 1 bob") }
runBlocking { steveConversation.send(text = "hey alice 1 steve") }

Thread.sleep(100)
val date = Date()

bobConversation.send(text = "hey alice 2 bob")
bobConversation.send(text = "hey alice 3 bob")
steveConversation.send(text = "hey alice 2 steve")
steveConversation.send(text = "hey alice 3 steve")
runBlocking { bobConversation.send(text = "hey alice 2 bob") }
runBlocking { bobConversation.send(text = "hey alice 3 bob") }
runBlocking { steveConversation.send(text = "hey alice 2 steve") }
runBlocking { steveConversation.send(text = "hey alice 3 steve") }

val messages = aliceClient.conversations.listBatchMessages(
listOf(
Expand Down Expand Up @@ -645,7 +657,7 @@ class ConversationTest {
assertEquals(conversation.version, Conversation.Version.V1)
val preparedMessage = conversation.prepareMessage(content = "hi")
val messageID = preparedMessage.messageId
conversation.send(prepared = preparedMessage)
runBlocking { conversation.send(prepared = preparedMessage) }
val messages = conversation.messages()
val message = messages[0]
assertEquals("hi", message.body)
Expand All @@ -657,7 +669,7 @@ class ConversationTest {
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
val preparedMessage = conversation.prepareMessage(content = "hi")
val messageID = preparedMessage.messageId
conversation.send(prepared = preparedMessage)
runBlocking { conversation.send(prepared = preparedMessage) }
val messages = conversation.messages()
val message = messages[0]
assertEquals("hi", message.body)
Expand All @@ -672,7 +684,7 @@ class ConversationTest {

// This does not need the `conversation` to `.publish` the message.
// This simulates a background task publishing all pending messages upon connection.
aliceClient.publish(envelopes = preparedMessage.envelopes)
runBlocking { aliceClient.publish(envelopes = preparedMessage.envelopes) }

val messages = conversation.messages()
val message = messages[0]
Expand Down Expand Up @@ -753,7 +765,7 @@ class ConversationTest {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
val encodedContent = TextCodec().encode(content = "hi")
bobConversation.send(encodedContent = encodedContent)
runBlocking { bobConversation.send(encodedContent = encodedContent) }
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals("hi", messages[0].content())
Expand All @@ -763,7 +775,7 @@ class ConversationTest {
fun testCanSendEncodedContentV2Message() {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val encodedContent = TextCodec().encode(content = "hi")
bobConversation.send(encodedContent = encodedContent)
runBlocking { bobConversation.send(encodedContent = encodedContent) }
val messages = bobConversation.messages()
assertEquals(1, messages.size)
assertEquals("hi", messages[0].content())
Expand Down Expand Up @@ -821,7 +833,7 @@ class ConversationTest {
// Conversations you receive should start as unknown
assertTrue(isUnknown)

aliceConversation.send(content = "hey bob")
runBlocking { aliceConversation.send(content = "hey bob") }
aliceClient.contacts.refreshConsentList()
val isNowAllowed = aliceConversation.consentState() == ConsentState.ALLOWED

Expand Down
Loading

0 comments on commit 0134852

Please sign in to comment.