Skip to content

Commit

Permalink
Merge pull request #256 from xmtp/cv/conversation-interface
Browse files Browse the repository at this point in the history
Conversation and Group implement common interface + Stream all
  • Loading branch information
cameronvoell authored Feb 14, 2024
2 parents ac99485 + 79b2526 commit bcd9875
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import expo.modules.xmtpreactnativesdk.wrappers.DecodedMessageWrapper
import expo.modules.xmtpreactnativesdk.wrappers.DecryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.EncryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.GroupWrapper
import expo.modules.xmtpreactnativesdk.wrappers.ConversationContainerWrapper
import expo.modules.xmtpreactnativesdk.wrappers.PreparedLocalMessage
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -150,6 +151,7 @@ class XMTPModule : Module() {
"sign",
"authed",
"conversation",
"conversationContainer",
"group",
"message",
"preEnableIdentityCallback",
Expand Down Expand Up @@ -678,6 +680,11 @@ class XMTPModule : Module() {
subscribeToGroups(clientAddress = clientAddress)
}

Function("subscribeToAll") { clientAddress: String ->
logV("subscribeToAll")
subscribeToAll(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
Expand Down Expand Up @@ -901,6 +908,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAll(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getConversationsKey(clientAddress)]?.cancel()
subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAll().collect { conversation ->
sendEvent(
"conversationContainer",
mapOf(
"clientAddress" to clientAddress,
"conversationContainer" to ConversationContainerWrapper.encodeToObj(client, conversation)
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in subscription to groups + conversations: $e")
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToAllMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package expo.modules.xmtpreactnativesdk.wrappers

import android.util.Base64
import org.xmtp.android.library.Client
import org.xmtp.android.library.Conversation

class ConversationContainerWrapper {

companion object {
fun encodeToObj(client: Client, conversation: Conversation): Map<String, Any> {
when (conversation.version) {
Conversation.Version.GROUP -> {
val group = (conversation as Conversation.Group).group
return GroupWrapper.encodeToObj(client, group, Base64.encodeToString(group.id,
Base64.NO_WRAP
))
}
else -> {
return ConversationWrapper.encodeToObj(client, conversation)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ConversationWrapper {
"context" to context,
"topic" to conversation.topic,
"peerAddress" to conversation.peerAddress,
"version" to if (conversation.version == Conversation.Version.V1) "v1" else "v2",
"version" to "DIRECT",
"conversationID" to (conversation.conversationId ?: ""),
"keyMaterial" to Base64.encodeToString(conversation.keyMaterial, Base64.NO_WRAP)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class GroupWrapper {
"id" to id,
"createdAt" to group.createdAt.time,
"peerAddresses" to group.memberAddresses(),

"version" to "GROUP",
"topic" to id
)
}

Expand Down
91 changes: 91 additions & 0 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
RemoteAttachmentCodec,
RemoteAttachmentContent,
Group,
ConversationContainer,
ConversationVersion,
} from '../../src/index'

type EncodedContent = content.EncodedContent
Expand Down Expand Up @@ -578,6 +580,92 @@ test('can stream groups', async () => {
return true
})

test('can stream all groups and conversations', async () => {
// Create three MLS enabled Clients
const aliceClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const bobClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const camClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})

// Start streaming groups and conversations
const containers: ConversationContainer<any>[] = []
const cancelStreamAll = await aliceClient.conversations.streamAll(
async (conversationContainer: ConversationContainer<any>) => {
containers.push(conversationContainer)
}
)

// Bob creates a group with Alice, so stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobGroup = await bobClient.conversations.newGroup([aliceClient.address])
await delayToPropogate()
if ((containers.length as number) !== 1) {
throw Error('Unexpected num groups (should be 1): ' + containers.length)
}
if (containers[0].version === ConversationVersion.GROUP) {
;(containers[0] as Group).sync()
} else {
console.log(JSON.stringify(containers[0] as Group))
throw Error('Unexpected first ConversationContainer should be a group')
}

// Bob creates a v2 Conversation with Alice so a stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobConversation = await bobClient.conversations.newConversation(
aliceClient.address
)
await delayToPropogate()
if ((containers.length as number) !== 2) {
throw Error('Unexpected num groups (should be 2): ' + containers.length)
}

if (
containers[1].version === ConversationVersion.DIRECT &&
bobConversation.conversationID !==
(containers[1] as Conversation<any>).conversationID
) {
throw Error(
'Conversation from streamed all should match conversationID with created conversation'
)
}

// * Note Alice creating a v2 Conversation does trigger alice conversations
// stream.

// Alice creates a V2 Conversationgroup
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const aliceConversation = await aliceClient.conversations.newConversation(
camClient.address
)
await delayToPropogate()
if (containers.length !== 3) {
throw Error('Expected group length 3 but it is: ' + containers.length)
}

cancelStreamAll()
await delayToPropogate()

// Creating a group should no longer trigger stream groups
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camConversation = await camClient.conversations.newGroup([
aliceClient.address,
])
await delayToPropogate()
if ((containers.length as number) !== 3) {
throw Error('Unexpected num groups (should be 3): ' + containers.length)
}

return true
})

test('can pass a custom filter date and receive message objects with expected dates', async () => {
try {
const bob = await Client.createRandom({ env: 'local' })
Expand Down Expand Up @@ -1291,7 +1379,10 @@ test('register and use custom content types', async () => {
bob.register(new NumberCodec())
alice.register(new NumberCodec())

delayToPropogate()

const bobConvo = await bob.conversations.newConversation(alice.address)
delayToPropogate()
const aliceConvo = await alice.conversations.newConversation(bob.address)

await bobConvo.send(
Expand Down
2 changes: 1 addition & 1 deletion ios/Wrappers/ConversationWrapper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct ConversationWrapper {
"createdAt": UInt64(conversation.createdAt.timeIntervalSince1970 * 1000),
"context": context,
"peerAddress": conversation.peerAddress,
"version": conversation.version == .v1 ? "v1" : "v2",
"version": "DIRECT",
"conversationID": conversation.conversationID ?? "",
"keyMaterial": conversation.keyMaterial?.base64EncodedString() ?? ""
]
Expand Down
8 changes: 8 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ export function subscribeToConversations(clientAddress: string) {
return XMTPModule.subscribeToConversations(clientAddress)
}

export function subscribeToAll(clientAddress: string) {
return XMTPModule.subscribeToAll(clientAddress)
}

export function subscribeToGroups(clientAddress: string) {
return XMTPModule.subscribeToGroups(clientAddress)
}
Expand Down Expand Up @@ -556,6 +560,10 @@ export * from './XMTP.types'
export { Client } from './lib/Client'
export * from './lib/ContentCodec'
export { Conversation } from './lib/Conversation'
export {
ConversationContainer,
ConversationVersion,
} from './lib/ConversationContainer'
export { Query } from './lib/Query'
export { XMTPPush } from './lib/XMTPPush'
export { ConsentListEntry, DecodedMessage }
Expand Down
18 changes: 9 additions & 9 deletions src/lib/Conversation.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { ContentTypeId } from './types/ContentCodec'
import {
ConversationVersion,
ConversationContainer,
} from './ConversationContainer'
import { ConversationSendPayload } from './types/ConversationCodecs'
import { DefaultContentTypes } from './types/DefaultContentType'
import { SendOptions } from './types/SendOptions'
import * as XMTP from '../index'
import {
ConversationContext,
DecodedMessage,
PreparedLocalMessage,
} from '../index'

export type SendOptions = {
contentType?: ContentTypeId
}

export class Conversation<ContentTypes extends DefaultContentTypes> {
export class Conversation<ContentTypes extends DefaultContentTypes>
implements ConversationContainer<ContentTypes>
{
client: XMTP.Client<ContentTypes>
createdAt: number
context?: ConversationContext
topic: string
peerAddress: string
version: string
version = ConversationVersion.DIRECT
conversationID?: string | undefined
/**
* Base64 encoded key material for the conversation.
Expand All @@ -32,7 +34,6 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
context?: ConversationContext
topic: string
peerAddress: string
version: string
conversationID?: string | undefined
keyMaterial?: string | undefined
}
Expand All @@ -42,7 +43,6 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
this.context = params.context
this.topic = params.topic
this.peerAddress = params.peerAddress
this.version = params.version
this.conversationID = params.conversationID
this.keyMaterial = params.keyMaterial
}
Expand Down
16 changes: 16 additions & 0 deletions src/lib/ConversationContainer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'

export enum ConversationVersion {
DIRECT = 'DIRECT',
GROUP = 'GROUP',
}

export interface ConversationContainer<
ContentTypes extends DefaultContentTypes,
> {
client: XMTP.Client<ContentTypes>
createdAt: number
version: ConversationVersion
topic: string
}
57 changes: 57 additions & 0 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Client } from './Client'
import { Conversation } from './Conversation'
import {
ConversationVersion,
ConversationContainer,
} from './ConversationContainer'
import { DecodedMessage } from './DecodedMessage'
import { Group } from './Group'
import { ConversationContext } from '../XMTP.types'
Expand Down Expand Up @@ -160,6 +164,59 @@ export default class Conversations<
)
}

/**
* Sets up a real-time stream to listen for new conversations and groups being started.
*
* This method subscribes to conversations in real-time and listens for incoming conversation and group events.
* When a new conversation is detected, the provided callback function is invoked with the details of the conversation.
* @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
* @warning This stream will continue infinitely. To end the stream, you can call the function returned by this streamAll.
*/
async streamAll(
callback: (
conversation: ConversationContainer<ContentTypes>
) => Promise<void>
) {
XMTPModule.subscribeToAll(this.client.address)
const subscription = XMTPModule.emitter.addListener(
'conversationContainer',
async ({
clientAddress,
conversationContainer,
}: {
clientAddress: string
conversationContainer: ConversationContainer<ContentTypes>
}) => {
if (this.known[conversationContainer.topic]) {
return
}

this.known[conversationContainer.topic] = true
console.log(
'Version on emitter call: ' +
JSON.stringify({ clientAddress, conversationContainer })
)
if (conversationContainer.version === ConversationVersion.GROUP) {
return await callback(
new Group(this.client, conversationContainer as Group<ContentTypes>)
)
} else {
return await callback(
new Conversation(
this.client,
conversationContainer as Conversation<ContentTypes>
)
)
}
}
)
return () => {
subscription.remove()
this.cancelStream()
}
}

/**
* Listen for new messages in all conversations.
*
Expand Down
Loading

0 comments on commit bcd9875

Please sign in to comment.