Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation and Group implement common interface + Stream all #256

Merged
merged 10 commits into from
Feb 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import expo.modules.xmtpreactnativesdk.wrappers.DecodedMessageWrapper
import expo.modules.xmtpreactnativesdk.wrappers.DecryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.EncryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.GroupWrapper
import expo.modules.xmtpreactnativesdk.wrappers.ConversationContainerWrapper
import expo.modules.xmtpreactnativesdk.wrappers.PreparedLocalMessage
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -150,6 +151,7 @@ class XMTPModule : Module() {
"sign",
"authed",
"conversation",
"conversationContainer",
"group",
"message",
"preEnableIdentityCallback",
Expand Down Expand Up @@ -678,6 +680,11 @@ class XMTPModule : Module() {
subscribeToGroups(clientAddress = clientAddress)
}

Function("subscribeToAll") { clientAddress: String ->
logV("subscribeToAll")
subscribeToAll(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
Expand Down Expand Up @@ -901,6 +908,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAll(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getConversationsKey(clientAddress)]?.cancel()
subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAll().collect { conversation ->
sendEvent(
"conversationContainer",
mapOf(
"clientAddress" to clientAddress,
"conversationContainer" to ConversationContainerWrapper.encodeToObj(client, conversation)
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in subscription to groups + conversations: $e")
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToAllMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package expo.modules.xmtpreactnativesdk.wrappers

import android.util.Base64
import com.google.gson.GsonBuilder
import org.xmtp.android.library.Client
import org.xmtp.android.library.Conversation

class ConversationContainerWrapper {

companion object {
fun encodeToObj(client: Client, conversation: Conversation): Map<String, Any> {
when (conversation.version) {
Conversation.Version.GROUP -> {
return mapOf(
"clientAddress" to client.address,
"id" to conversation.topic,
"createdAt" to conversation.createdAt.time,
"peerAddresses" to conversation.peerAddresses,
"version" to "group",
"topic" to conversation.topic
)
}
else -> {
val context = when (conversation.version) {
Conversation.Version.V2 -> mapOf<String, Any>(
"conversationID" to (conversation.conversationId ?: ""),
// TODO: expose the context/metadata explicitly in xmtp-android
"metadata" to conversation.toTopicData().invitation.context.metadataMap,
)

else -> mapOf()
}
return mapOf(
"clientAddress" to client.address,
"createdAt" to conversation.createdAt.time,
"context" to context,
"topic" to conversation.topic,
"peerAddress" to conversation.peerAddress,
"version" to if (conversation.version == Conversation.Version.V1) "v1" else "v2",
"conversationID" to (conversation.conversationId ?: ""),
"keyMaterial" to Base64.encodeToString(conversation.keyMaterial, Base64.NO_WRAP)
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe clean this up by using the existing wrapper classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated fb87e3b 👍

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class GroupWrapper {
"id" to id,
"createdAt" to group.createdAt.time,
"peerAddresses" to group.memberAddresses(),

"version" to "group",
"topic" to id
)
}

Expand Down
84 changes: 84 additions & 0 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { content } from '@xmtp/proto'
import ReactNativeBlobUtil from 'react-native-blob-util'
import { TextEncoder, TextDecoder } from 'text-encoding'
import { ConversationContainer } from 'xmtp-react-native-sdk/lib/ConversationContainer'
import { DecodedMessage } from 'xmtp-react-native-sdk/lib/DecodedMessage'

import {

Check warning on line 7 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

There should be at least one empty line between import groups
Query,
JSContentCodec,
Client,
Expand All @@ -13,6 +14,7 @@
RemoteAttachmentContent,
Group,
} from '../../src/index'
import { DefaultContentTypes } from 'xmtp-react-native-sdk/lib/types/DefaultContentType'

Check warning on line 17 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

`xmtp-react-native-sdk/lib/types/DefaultContentType` import should occur before import of `../../src/index`

Check warning on line 17 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

'DefaultContentTypes' is defined but never used

type EncodedContent = content.EncodedContent
type ContentTypeId = content.ContentTypeId
Expand Down Expand Up @@ -578,6 +580,85 @@
return true
})

test('can stream all groups and conversations', async () => {
// Create three MLS enabled Clients
const aliceClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const bobClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const camClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})

// Start streaming groups and conversations
const containers: ConversationContainer<any>[] = []
const cancelStreamAll = await aliceClient.conversations.streamAll(
async (conversationContainer: ConversationContainer<any>) => {
containers.push(conversationContainer)
}
)

// Bob creates a group with Alice, so stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobGroup = await bobClient.conversations.newGroup([aliceClient.address])
await delayToPropogate()
if ((containers.length as number) !== 1) {
throw Error('Unexpected num groups (should be 1): ' + containers.length)
}
if (containers[0].isGroup()) {
(containers[0] as Group).sync()

Check warning on line 614 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `;`
} else {
throw Error('Unexpected first ConversationContainer should be a group')
}

// Bob creates a v2 Conversation with Alice so a stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobConversation = await bobClient.conversations.newConversation(
aliceClient.address
)
await delayToPropogate()
if ((containers.length as number) !== 2) {
throw Error('Unexpected num groups (should be 2): ' + containers.length)
}

if(bobConversation.conversationID != (containers[1] as Conversation<any>).conversationID) {

Check warning on line 629 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `(bobConversation.conversationID·!=·(containers[1]·as·Conversation<any>).conversationID` with `·(⏎····bobConversation.conversationID·!=⏎····(containers[1]·as·Conversation<any>).conversationID⏎··`

Check warning on line 629 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

Expected '!==' and instead saw '!='
throw Error('Conversation from streamed all should match conversationID with created conversation')

Check warning on line 630 in example/src/tests.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `'Conversation·from·streamed·all·should·match·conversationID·with·created·conversation'` with `⏎······'Conversation·from·streamed·all·should·match·conversationID·with·created·conversation'⏎····`
}

// * Note Alice creating a v2 Conversation does trigger alice conversations
// stream.

// Alice creates a V2 Conversationgroup
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const aliceConversation = await aliceClient.conversations.newConversation(
camClient.address
)
await delayToPropogate()
if (containers.length !== 3) {
throw Error('Expected group length 3 but it is: ' + containers.length)
}

cancelStreamAll()
await delayToPropogate()

// Creating a group should no longer trigger stream groups
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camConversation = await camClient.conversations.newGroup([
aliceClient.address,
])
await delayToPropogate()
if ((containers.length as number) !== 3) {
throw Error('Unexpected num groups (should be 3): ' + containers.length)
}

return true
})

test('can pass a custom filter date and receive message objects with expected dates', async () => {
try {
const bob = await Client.createRandom({ env: 'local' })
Expand Down Expand Up @@ -1291,7 +1372,10 @@
bob.register(new NumberCodec())
alice.register(new NumberCodec())

delayToPropogate()

const bobConvo = await bob.conversations.newConversation(alice.address)
delayToPropogate()
const aliceConvo = await alice.conversations.newConversation(bob.address)

await bobConvo.send(
Expand Down
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ export function subscribeToConversations(clientAddress: string) {
return XMTPModule.subscribeToConversations(clientAddress)
}

export function subscribeToAll(clientAddress: string) {
return XMTPModule.subscribeToAll(clientAddress)
}

export function subscribeToGroups(clientAddress: string) {
return XMTPModule.subscribeToGroups(clientAddress)
}
Expand Down
22 changes: 14 additions & 8 deletions src/lib/Conversation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ContentTypeId } from './types/ContentCodec'
import {
ConversationVersion,
ConversationContainer,
SendOptions,
} from './ConversationContainer'
import { ConversationSendPayload } from './types/ConversationCodecs'
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'
Expand All @@ -8,17 +12,15 @@ import {
PreparedLocalMessage,
} from '../index'

export type SendOptions = {
contentType?: ContentTypeId
}

export class Conversation<ContentTypes extends DefaultContentTypes> {
export class Conversation<ContentTypes extends DefaultContentTypes>
implements ConversationContainer<ContentTypes>
{
client: XMTP.Client<ContentTypes>
createdAt: number
context?: ConversationContext
topic: string
peerAddress: string
version: string
version: ConversationVersion
conversationID?: string | undefined
/**
* Base64 encoded key material for the conversation.
Expand All @@ -32,7 +34,7 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
context?: ConversationContext
topic: string
peerAddress: string
version: string
version: ConversationVersion
conversationID?: string | undefined
keyMaterial?: string | undefined
}
Expand Down Expand Up @@ -301,4 +303,8 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
XMTP.unsubscribeFromMessages(this.client.address, this.topic)
}
}

isGroup(): boolean {
return false
}
}
23 changes: 23 additions & 0 deletions src/lib/ConversationContainer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ContentTypeId } from './types/ContentCodec'
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'

export type SendOptions = {
contentType?: ContentTypeId
}

export enum ConversationVersion {
V1 = 'v1',
V2 = 'v2',
GROUP = 'group',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DirectMessage and Group RN doesn't need to know about V1 or V2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated here - 4816fd4

}

export interface ConversationContainer<
ContentTypes extends DefaultContentTypes,
> {
client: XMTP.Client<ContentTypes>
createdAt: number
version: ConversationVersion
topic: string
isGroup(): boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove is Group in favor of version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed here - 4816fd4

}
57 changes: 57 additions & 0 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Client } from './Client'
import { Conversation } from './Conversation'
import {
ConversationVersion,
ConversationContainer,
} from './ConversationContainer'
import { DecodedMessage } from './DecodedMessage'
import { Group } from './Group'
import { ConversationContext } from '../XMTP.types'
Expand Down Expand Up @@ -160,6 +164,59 @@ export default class Conversations<
)
}

/**
* Sets up a real-time stream to listen for new conversations and groups being started.
*
* This method subscribes to conversations in real-time and listens for incoming conversation and group events.
* When a new conversation is detected, the provided callback function is invoked with the details of the conversation.
* @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
* @warning This stream will continue infinitely. To end the stream, you can call the function returned by this streamAll.
*/
async streamAll(
callback: (
conversation: ConversationContainer<ContentTypes>
) => Promise<void>
) {
XMTPModule.subscribeToAll(this.client.address)
const subscription = XMTPModule.emitter.addListener(
'conversationContainer',
async ({
clientAddress,
conversationContainer,
}: {
clientAddress: string
conversationContainer: ConversationContainer<ContentTypes>
}) => {
if (this.known[conversationContainer.topic]) {
return
}

this.known[conversationContainer.topic] = true
console.log(
'Version on emitter call: ' +
JSON.stringify({ clientAddress, conversationContainer })
)
if (conversationContainer.version === ConversationVersion.GROUP) {
return await callback(
new Group(this.client, conversationContainer as Group<ContentTypes>)
)
} else {
return await callback(
new Conversation(
this.client,
conversationContainer as Conversation<ContentTypes>
)
)
}
}
)
return () => {
subscription.remove()
this.cancelStream()
}
}

/**
* Listen for new messages in all conversations.
*
Expand Down
Loading
Loading