Skip to content

Commit

Permalink
Added conv and group implement common interface. stream all
Browse files Browse the repository at this point in the history
  • Loading branch information
cameronvoell committed Feb 13, 2024
1 parent 8282ee2 commit 7ca505e
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import expo.modules.xmtpreactnativesdk.wrappers.DecodedMessageWrapper
import expo.modules.xmtpreactnativesdk.wrappers.DecryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.EncryptedLocalAttachment
import expo.modules.xmtpreactnativesdk.wrappers.GroupWrapper
import expo.modules.xmtpreactnativesdk.wrappers.IConversationWrapper
import expo.modules.xmtpreactnativesdk.wrappers.PreparedLocalMessage
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -150,6 +151,7 @@ class XMTPModule : Module() {
"sign",
"authed",
"conversation",
"IConversation",
"group",
"message",
"preEnableIdentityCallback",
Expand Down Expand Up @@ -667,6 +669,11 @@ class XMTPModule : Module() {
subscribeToGroups(clientAddress = clientAddress)
}

Function("subscribeToAll") { clientAddress: String ->
logV("subscribeToAll")
subscribeToAll(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
Expand Down Expand Up @@ -890,6 +897,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAll(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getConversationsKey(clientAddress)]?.cancel()
subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAll().collect { conversation ->
sendEvent(
"IConversation",
mapOf(
"clientAddress" to clientAddress,
"iConversation" to IConversationWrapper.encodeToObj(client, conversation)
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in subscription to groups + conversations: $e")
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToAllMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class GroupWrapper {
"id" to id,
"createdAt" to group.createdAt.time,
"peerAddresses" to group.memberAddresses(),

"version" to "group",
"topic" to id
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package expo.modules.xmtpreactnativesdk.wrappers

import android.util.Base64
import com.google.gson.GsonBuilder
import org.xmtp.android.library.Client
import org.xmtp.android.library.Conversation

class IConversationWrapper {

companion object {
fun encodeToObj(client: Client, conversation: Conversation): Map<String, Any> {
when (conversation.version) {
Conversation.Version.GROUP -> {
return mapOf(
"clientAddress" to client.address,
"id" to conversation.topic,
"createdAt" to conversation.createdAt.time,
"peerAddresses" to conversation.peerAddresses,
"version" to "group",
"topic" to conversation.topic
)
}
else -> {
val context = when (conversation.version) {
Conversation.Version.V2 -> mapOf<String, Any>(
"conversationID" to (conversation.conversationId ?: ""),
// TODO: expose the context/metadata explicitly in xmtp-android
"metadata" to conversation.toTopicData().invitation.context.metadataMap,
)

else -> mapOf()
}
return mapOf(
"clientAddress" to client.address,
"createdAt" to conversation.createdAt.time,
"context" to context,
"topic" to conversation.topic,
"peerAddress" to conversation.peerAddress,
"version" to if (conversation.version == Conversation.Version.V1) "v1" else "v2",
"conversationID" to (conversation.conversationId ?: ""),
"keyMaterial" to Base64.encodeToString(conversation.keyMaterial, Base64.NO_WRAP)
)
}
}
}
}
}
71 changes: 71 additions & 0 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { content } from '@xmtp/proto'
import ReactNativeBlobUtil from 'react-native-blob-util'
import { TextEncoder, TextDecoder } from 'text-encoding'
import { DecodedMessage } from 'xmtp-react-native-sdk/lib/DecodedMessage'
import { IConversation } from 'xmtp-react-native-sdk/lib/IConversation'

import {
Query,
Expand Down Expand Up @@ -568,6 +569,76 @@ test('can stream groups', async () => {
return true
})

test('can stream groups and conversations', async () => {
// Create three MLS enabled Clients
const aliceClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const bobClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const camClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})

// Start streaming groups
const groups: IConversation<any>[] = []
const cancelStreamAll = await aliceClient.conversations.streamAll(
async (iConversation: IConversation<any>) => {
groups.push(iConversation)
}
)

// Cam creates a group with Alice, so stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camGroup = await camClient.conversations.newGroup([aliceClient.address])
await delayToPropogate()
if ((groups.length as number) !== 1) {
throw Error('Unexpected num groups (should be 1): ' + groups.length)
}

// Bob creates a v2 Conversation with Alice so a stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobConversation = await bobClient.conversations.newConversation(
aliceClient.address
)
await delayToPropogate()
if ((groups.length as number) !== 2) {
throw Error('Unexpected num groups (should be 2): ' + groups.length)
}

// * Note Alice creating a v2 Conversation does trigger alice conversations
// stream.

// Alice creates a V2 Conversationgroup
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const aliceConversation = await aliceClient.conversations.newConversation(
camClient.address
)
await delayToPropogate()
if (groups.length !== 3) {
throw Error('Expected group length 3 but it is: ' + groups.length)
}

cancelStreamAll()
await delayToPropogate()

// Creating a group should no longer trigger stream groups
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camSecond = await camClient.conversations.newGroup([
aliceClient.address,
])
await delayToPropogate()
if ((groups.length as number) !== 3) {
throw Error('Unexpected num groups (should be 3): ' + groups.length)
}

return true
})

test('can pass a custom filter date and receive message objects with expected dates', async () => {
try {
const bob = await Client.createRandom({ env: 'local' })
Expand Down
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ export function subscribeToConversations(clientAddress: string) {
return XMTPModule.subscribeToConversations(clientAddress)
}

export function subscribeToAll(clientAddress: string) {
return XMTPModule.subscribeToAll(clientAddress)
}

export function subscribeToGroups(clientAddress: string) {
return XMTPModule.subscribeToGroups(clientAddress)
}
Expand Down
22 changes: 14 additions & 8 deletions src/lib/Conversation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ContentTypeId } from './types/ContentCodec'
import {
ConversationVersion,
IConversation,
SendOptions,
} from './IConversation'
import { ConversationSendPayload } from './types/ConversationCodecs'
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'
Expand All @@ -8,17 +12,15 @@ import {
PreparedLocalMessage,
} from '../index'

export type SendOptions = {
contentType?: ContentTypeId
}

export class Conversation<ContentTypes extends DefaultContentTypes> {
export class Conversation<ContentTypes extends DefaultContentTypes>
implements IConversation<ContentTypes>
{
client: XMTP.Client<ContentTypes>
createdAt: number
context?: ConversationContext
topic: string
peerAddress: string
version: string
version: ConversationVersion
conversationID?: string | undefined
/**
* Base64 encoded key material for the conversation.
Expand All @@ -32,7 +34,7 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
context?: ConversationContext
topic: string
peerAddress: string
version: string
version: ConversationVersion
conversationID?: string | undefined
keyMaterial?: string | undefined
}
Expand Down Expand Up @@ -301,4 +303,8 @@ export class Conversation<ContentTypes extends DefaultContentTypes> {
XMTP.unsubscribeFromMessages(this.client.address, this.topic)
}
}

isGroup(): boolean {
return false
}
}
52 changes: 52 additions & 0 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Client } from './Client'
import { Conversation } from './Conversation'
import { DecodedMessage } from './DecodedMessage'
import { Group } from './Group'
import { ConversationVersion, IConversation } from './IConversation'
import { ConversationContext } from '../XMTP.types'
import * as XMTPModule from '../index'
import { ContentCodec } from '../index'
Expand Down Expand Up @@ -160,6 +161,57 @@ export default class Conversations<
)
}

/**
* Sets up a real-time stream to listen for new conversations and groups being started.
*
* This method subscribes to conversations in real-time and listens for incoming conversation and group events.
* When a new conversation is detected, the provided callback function is invoked with the details of the conversation.
* @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
* @warning This stream will continue infinitely. To end the stream, you can call the function returned by this streamAll.
*/
async streamAll(
callback: (conversation: IConversation<ContentTypes>) => Promise<void>
) {
XMTPModule.subscribeToAll(this.client.address)
const subscription = XMTPModule.emitter.addListener(
'IConversation',
async ({
clientAddress,
iConversation,
}: {
clientAddress: string
iConversation: IConversation<ContentTypes>
}) => {
if (this.known[iConversation.topic]) {
return
}

this.known[iConversation.topic] = true
console.log(
'Version on emitter call: ' +
JSON.stringify({ clientAddress, iConversation })
)
if (iConversation.version === ConversationVersion.GROUP) {
return await callback(
new Group(this.client, iConversation as Group<ContentTypes>)
)
} else {
return await callback(
new Conversation(
this.client,
iConversation as Conversation<ContentTypes>
)
)
}
}
)
return () => {
subscription.remove()
this.cancelStream()
}
}

/**
* Listen for new messages in all conversations.
*
Expand Down
16 changes: 14 additions & 2 deletions src/lib/Group.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import { SendOptions } from './Conversation'
import { DecodedMessage } from './DecodedMessage'
import {
SendOptions,
ConversationVersion,
IConversation,
} from './IConversation'
import { ConversationSendPayload } from './types/ConversationCodecs'
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'

export class Group<
ContentTypes extends DefaultContentTypes = DefaultContentTypes,
> {
> implements IConversation<ContentTypes>
{
client: XMTP.Client<ContentTypes>
id: string
createdAt: number
peerAddresses: string[]
version = ConversationVersion.GROUP
topic: string

constructor(
client: XMTP.Client<ContentTypes>,
Expand All @@ -24,6 +31,7 @@ export class Group<
this.id = params.id
this.createdAt = params.createdAt
this.peerAddresses = params.peerAddresses
this.topic = params.id
}

get clientAddress(): string {
Expand Down Expand Up @@ -126,4 +134,8 @@ export class Group<
async removeMembers(addresses: string[]): Promise<void> {
return XMTP.removeGroupMembers(this.client.address, this.id, addresses)
}

isGroup(): boolean {
return true
}
}
21 changes: 21 additions & 0 deletions src/lib/IConversation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { ContentTypeId } from './types/ContentCodec'
import { DefaultContentTypes } from './types/DefaultContentType'
import * as XMTP from '../index'

export type SendOptions = {
contentType?: ContentTypeId
}

export enum ConversationVersion {
V1 = 'v1',
V2 = 'v2',
GROUP = 'group',
}

export interface IConversation<ContentTypes extends DefaultContentTypes> {
client: XMTP.Client<ContentTypes>
createdAt: number
version: ConversationVersion
topic: string
isGroup(): boolean
}

0 comments on commit 7ca505e

Please sign in to comment.