Skip to content

Commit

Permalink
update to use the new subscibe2 and make sure all the tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Nov 7, 2023
1 parent 96d2a96 commit b53c701
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import androidx.test.ext.junit.runners.AndroidJUnit4
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Ignore
import org.junit.Test
import org.junit.runner.RunWith
import org.xmtp.android.library.codecs.TextCodec
Expand Down Expand Up @@ -79,8 +77,7 @@ class ConversationsTest {
}

@Test
@Ignore("Flaky Test")
fun testStreamAllMessages() = runBlocking {
fun testStreamAllMessages() {
val bo = PrivateKeyBuilder()
val alix = PrivateKeyBuilder()
val clientOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.xmtp.proto.message.contents.PrivateKeyOuterClass.PrivateKeyBundle
import java.util.Date

@RunWith(AndroidJUnit4::class)
@Ignore("All Flaky")
class LocalInstrumentedTest {
@Test
fun testPublishingAndFetchingContactBundlesWithWhileGeneratingKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class FakeApiClient : ApiClient {
}
return flowOf()
}

override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
TODO("Not yet implemented")
}
}

data class Fixtures(
Expand Down
19 changes: 8 additions & 11 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.job
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
Expand All @@ -27,9 +27,7 @@ import org.xmtp.android.library.messages.senderAddress
import org.xmtp.android.library.messages.sentAt
import org.xmtp.android.library.messages.toSignedPublicKeyBundle
import org.xmtp.android.library.messages.walletAddress
import org.xmtp.android.library.push.Service.SubscribeRequest
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import org.xmtp.proto.message.contents.Contact
import org.xmtp.proto.message.contents.Invitation
import java.util.Date
Expand Down Expand Up @@ -369,7 +367,8 @@ data class Conversations(
topics.add(conversation.topic)
}

val subscribeFlow = createSubscribeFlow(topics)
val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))

while (true) {
try {
client.subscribe2(request = subscribeFlow).collect { envelope ->
Expand All @@ -384,7 +383,7 @@ data class Conversations(
val conversation = fromInvite(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
subscribeFlow.value = makeSubscribeRequest(topics)
}

envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
Expand All @@ -393,19 +392,17 @@ data class Conversations(
val decoded = conversation.decode(envelope)
emit(decoded)
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
subscribeFlow.value = makeSubscribeRequest(topics)
}

else -> {}
}
}
} catch (error: CancellationException) {
break
} catch (error: Exception) {
continue
}
}
}

private fun createSubscribeFlow(topics: List<String>): Flow<MessageApiOuterClass.SubscribeRequest> = flow {
emit(makeSubscribeRequest(topics))
}
}
4 changes: 4 additions & 0 deletions library/src/test/java/org/xmtp/android/library/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ class FakeApiClient : ApiClient {
}
return flowOf()
}

override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
TODO("Not yet implemented")
}
}

data class Fixtures(val aliceAccount: PrivateKeyBuilder, val bobAccount: PrivateKeyBuilder) {
Expand Down

0 comments on commit b53c701

Please sign in to comment.