From 7ca505e80db3dd7d5f86af94fa4b22b2f213b34e Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Tue, 13 Feb 2024 08:04:05 -0800 Subject: [PATCH] Added conv and group implement common interface. stream all --- .../modules/xmtpreactnativesdk/XMTPModule.kt | 29 ++++++++ .../wrappers/GroupWrapper.kt | 3 +- .../wrappers/IConversationWrapper.kt | 47 ++++++++++++ example/src/tests.ts | 71 +++++++++++++++++++ src/index.ts | 4 ++ src/lib/Conversation.ts | 22 +++--- src/lib/Conversations.ts | 52 ++++++++++++++ src/lib/Group.ts | 16 ++++- src/lib/IConversation.ts | 21 ++++++ 9 files changed, 254 insertions(+), 11 deletions(-) create mode 100644 android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/IConversationWrapper.kt create mode 100644 src/lib/IConversation.ts diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt index b514698b9..8deaab58d 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt @@ -19,6 +19,7 @@ import expo.modules.xmtpreactnativesdk.wrappers.DecodedMessageWrapper import expo.modules.xmtpreactnativesdk.wrappers.DecryptedLocalAttachment import expo.modules.xmtpreactnativesdk.wrappers.EncryptedLocalAttachment import expo.modules.xmtpreactnativesdk.wrappers.GroupWrapper +import expo.modules.xmtpreactnativesdk.wrappers.IConversationWrapper import expo.modules.xmtpreactnativesdk.wrappers.PreparedLocalMessage import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope @@ -150,6 +151,7 @@ class XMTPModule : Module() { "sign", "authed", "conversation", + "IConversation", "group", "message", "preEnableIdentityCallback", @@ -667,6 +669,11 @@ class XMTPModule : Module() { subscribeToGroups(clientAddress = clientAddress) } + Function("subscribeToAll") { clientAddress: String -> + logV("subscribeToAll") + subscribeToAll(clientAddress = clientAddress) + } + Function("subscribeToAllMessages") { clientAddress: String -> logV("subscribeToAllMessages") subscribeToAllMessages(clientAddress = clientAddress) @@ -890,6 +897,28 @@ class XMTPModule : Module() { } } + private fun subscribeToAll(clientAddress: String) { + val client = clients[clientAddress] ?: throw XMTPException("No client") + + subscriptions[getConversationsKey(clientAddress)]?.cancel() + subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { + try { + client.conversations.streamAll().collect { conversation -> + sendEvent( + "IConversation", + mapOf( + "clientAddress" to clientAddress, + "iConversation" to IConversationWrapper.encodeToObj(client, conversation) + ) + ) + } + } catch (e: Exception) { + Log.e("XMTPModule", "Error in subscription to groups + conversations: $e") + subscriptions[getConversationsKey(clientAddress)]?.cancel() + } + } + } + private fun subscribeToAllMessages(clientAddress: String) { val client = clients[clientAddress] ?: throw XMTPException("No client") diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/GroupWrapper.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/GroupWrapper.kt index 8c3c6da3f..59a400ba5 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/GroupWrapper.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/GroupWrapper.kt @@ -15,7 +15,8 @@ class GroupWrapper { "id" to id, "createdAt" to group.createdAt.time, "peerAddresses" to group.memberAddresses(), - + "version" to "group", + "topic" to id ) } diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/IConversationWrapper.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/IConversationWrapper.kt new file mode 100644 index 000000000..88caab55a --- /dev/null +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/wrappers/IConversationWrapper.kt @@ -0,0 +1,47 @@ +package expo.modules.xmtpreactnativesdk.wrappers + +import android.util.Base64 +import com.google.gson.GsonBuilder +import org.xmtp.android.library.Client +import org.xmtp.android.library.Conversation + +class IConversationWrapper { + + companion object { + fun encodeToObj(client: Client, conversation: Conversation): Map { + when (conversation.version) { + Conversation.Version.GROUP -> { + return mapOf( + "clientAddress" to client.address, + "id" to conversation.topic, + "createdAt" to conversation.createdAt.time, + "peerAddresses" to conversation.peerAddresses, + "version" to "group", + "topic" to conversation.topic + ) + } + else -> { + val context = when (conversation.version) { + Conversation.Version.V2 -> mapOf( + "conversationID" to (conversation.conversationId ?: ""), + // TODO: expose the context/metadata explicitly in xmtp-android + "metadata" to conversation.toTopicData().invitation.context.metadataMap, + ) + + else -> mapOf() + } + return mapOf( + "clientAddress" to client.address, + "createdAt" to conversation.createdAt.time, + "context" to context, + "topic" to conversation.topic, + "peerAddress" to conversation.peerAddress, + "version" to if (conversation.version == Conversation.Version.V1) "v1" else "v2", + "conversationID" to (conversation.conversationId ?: ""), + "keyMaterial" to Base64.encodeToString(conversation.keyMaterial, Base64.NO_WRAP) + ) + } + } + } + } +} diff --git a/example/src/tests.ts b/example/src/tests.ts index e4d9bb667..58f747d14 100644 --- a/example/src/tests.ts +++ b/example/src/tests.ts @@ -2,6 +2,7 @@ import { content } from '@xmtp/proto' import ReactNativeBlobUtil from 'react-native-blob-util' import { TextEncoder, TextDecoder } from 'text-encoding' import { DecodedMessage } from 'xmtp-react-native-sdk/lib/DecodedMessage' +import { IConversation } from 'xmtp-react-native-sdk/lib/IConversation' import { Query, @@ -568,6 +569,76 @@ test('can stream groups', async () => { return true }) +test('can stream groups and conversations', async () => { + // Create three MLS enabled Clients + const aliceClient = await Client.createRandom({ + env: 'local', + enableAlphaMls: true, + }) + const bobClient = await Client.createRandom({ + env: 'local', + enableAlphaMls: true, + }) + const camClient = await Client.createRandom({ + env: 'local', + enableAlphaMls: true, + }) + + // Start streaming groups + const groups: IConversation[] = [] + const cancelStreamAll = await aliceClient.conversations.streamAll( + async (iConversation: IConversation) => { + groups.push(iConversation) + } + ) + + // Cam creates a group with Alice, so stream callback is fired + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const camGroup = await camClient.conversations.newGroup([aliceClient.address]) + await delayToPropogate() + if ((groups.length as number) !== 1) { + throw Error('Unexpected num groups (should be 1): ' + groups.length) + } + + // Bob creates a v2 Conversation with Alice so a stream callback is fired + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const bobConversation = await bobClient.conversations.newConversation( + aliceClient.address + ) + await delayToPropogate() + if ((groups.length as number) !== 2) { + throw Error('Unexpected num groups (should be 2): ' + groups.length) + } + + // * Note Alice creating a v2 Conversation does trigger alice conversations + // stream. + + // Alice creates a V2 Conversationgroup + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const aliceConversation = await aliceClient.conversations.newConversation( + camClient.address + ) + await delayToPropogate() + if (groups.length !== 3) { + throw Error('Expected group length 3 but it is: ' + groups.length) + } + + cancelStreamAll() + await delayToPropogate() + + // Creating a group should no longer trigger stream groups + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const camSecond = await camClient.conversations.newGroup([ + aliceClient.address, + ]) + await delayToPropogate() + if ((groups.length as number) !== 3) { + throw Error('Unexpected num groups (should be 3): ' + groups.length) + } + + return true +}) + test('can pass a custom filter date and receive message objects with expected dates', async () => { try { const bob = await Client.createRandom({ env: 'local' }) diff --git a/src/index.ts b/src/index.ts index 1a092e44c..bcf2d1583 100644 --- a/src/index.ts +++ b/src/index.ts @@ -424,6 +424,10 @@ export function subscribeToConversations(clientAddress: string) { return XMTPModule.subscribeToConversations(clientAddress) } +export function subscribeToAll(clientAddress: string) { + return XMTPModule.subscribeToAll(clientAddress) +} + export function subscribeToGroups(clientAddress: string) { return XMTPModule.subscribeToGroups(clientAddress) } diff --git a/src/lib/Conversation.ts b/src/lib/Conversation.ts index 436ad8185..db4b313bb 100644 --- a/src/lib/Conversation.ts +++ b/src/lib/Conversation.ts @@ -1,4 +1,8 @@ -import { ContentTypeId } from './types/ContentCodec' +import { + ConversationVersion, + IConversation, + SendOptions, +} from './IConversation' import { ConversationSendPayload } from './types/ConversationCodecs' import { DefaultContentTypes } from './types/DefaultContentType' import * as XMTP from '../index' @@ -8,17 +12,15 @@ import { PreparedLocalMessage, } from '../index' -export type SendOptions = { - contentType?: ContentTypeId -} - -export class Conversation { +export class Conversation + implements IConversation +{ client: XMTP.Client createdAt: number context?: ConversationContext topic: string peerAddress: string - version: string + version: ConversationVersion conversationID?: string | undefined /** * Base64 encoded key material for the conversation. @@ -32,7 +34,7 @@ export class Conversation { context?: ConversationContext topic: string peerAddress: string - version: string + version: ConversationVersion conversationID?: string | undefined keyMaterial?: string | undefined } @@ -301,4 +303,8 @@ export class Conversation { XMTP.unsubscribeFromMessages(this.client.address, this.topic) } } + + isGroup(): boolean { + return false + } } diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 57d9fae6a..8eeb4b1f1 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -2,6 +2,7 @@ import { Client } from './Client' import { Conversation } from './Conversation' import { DecodedMessage } from './DecodedMessage' import { Group } from './Group' +import { ConversationVersion, IConversation } from './IConversation' import { ConversationContext } from '../XMTP.types' import * as XMTPModule from '../index' import { ContentCodec } from '../index' @@ -160,6 +161,57 @@ export default class Conversations< ) } + /** + * Sets up a real-time stream to listen for new conversations and groups being started. + * + * This method subscribes to conversations in real-time and listens for incoming conversation and group events. + * When a new conversation is detected, the provided callback function is invoked with the details of the conversation. + * @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started. + * @returns {Promise} A Promise that resolves when the stream is set up. + * @warning This stream will continue infinitely. To end the stream, you can call the function returned by this streamAll. + */ + async streamAll( + callback: (conversation: IConversation) => Promise + ) { + XMTPModule.subscribeToAll(this.client.address) + const subscription = XMTPModule.emitter.addListener( + 'IConversation', + async ({ + clientAddress, + iConversation, + }: { + clientAddress: string + iConversation: IConversation + }) => { + if (this.known[iConversation.topic]) { + return + } + + this.known[iConversation.topic] = true + console.log( + 'Version on emitter call: ' + + JSON.stringify({ clientAddress, iConversation }) + ) + if (iConversation.version === ConversationVersion.GROUP) { + return await callback( + new Group(this.client, iConversation as Group) + ) + } else { + return await callback( + new Conversation( + this.client, + iConversation as Conversation + ) + ) + } + } + ) + return () => { + subscription.remove() + this.cancelStream() + } + } + /** * Listen for new messages in all conversations. * diff --git a/src/lib/Group.ts b/src/lib/Group.ts index 842cdb2f2..02253329a 100644 --- a/src/lib/Group.ts +++ b/src/lib/Group.ts @@ -1,16 +1,23 @@ -import { SendOptions } from './Conversation' import { DecodedMessage } from './DecodedMessage' +import { + SendOptions, + ConversationVersion, + IConversation, +} from './IConversation' import { ConversationSendPayload } from './types/ConversationCodecs' import { DefaultContentTypes } from './types/DefaultContentType' import * as XMTP from '../index' export class Group< ContentTypes extends DefaultContentTypes = DefaultContentTypes, -> { +> implements IConversation +{ client: XMTP.Client id: string createdAt: number peerAddresses: string[] + version = ConversationVersion.GROUP + topic: string constructor( client: XMTP.Client, @@ -24,6 +31,7 @@ export class Group< this.id = params.id this.createdAt = params.createdAt this.peerAddresses = params.peerAddresses + this.topic = params.id } get clientAddress(): string { @@ -126,4 +134,8 @@ export class Group< async removeMembers(addresses: string[]): Promise { return XMTP.removeGroupMembers(this.client.address, this.id, addresses) } + + isGroup(): boolean { + return true + } } diff --git a/src/lib/IConversation.ts b/src/lib/IConversation.ts new file mode 100644 index 000000000..18f470329 --- /dev/null +++ b/src/lib/IConversation.ts @@ -0,0 +1,21 @@ +import { ContentTypeId } from './types/ContentCodec' +import { DefaultContentTypes } from './types/DefaultContentType' +import * as XMTP from '../index' + +export type SendOptions = { + contentType?: ContentTypeId +} + +export enum ConversationVersion { + V1 = 'v1', + V2 = 'v2', + GROUP = 'group', +} + +export interface IConversation { + client: XMTP.Client + createdAt: number + version: ConversationVersion + topic: string + isGroup(): boolean +}