Skip to content

Commit

Permalink
Merge pull request #254 from xmtp/cv/stream-groups
Browse files Browse the repository at this point in the history
Stream groups (android)
  • Loading branch information
cameronvoell authored Feb 13, 2024
2 parents 76d4e62 + 47eb60c commit 3aeaba1
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 7 deletions.
2 changes: 1 addition & 1 deletion android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.facebook.react:react-native:0.71.3'
implementation "com.daveanthonythomas.moshipack:moshipack:1.0.1"
implementation "org.xmtp:android:0.7.10"
implementation "org.xmtp:android:0.7.11"
// xmtp-android local testing setup below (comment org.xmtp:android above)
// implementation files('<PATH TO XMTP-ANDROID>/xmtp-android/library/build/outputs/aar/library-debug.aar')
// implementation 'com.google.crypto.tink:tink-android:1.7.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class XMTPModule : Module() {
"sign",
"authed",
"conversation",
"group",
"message",
"preEnableIdentityCallback",
"preCreateIdentityCallback"
Expand Down Expand Up @@ -602,9 +603,7 @@ class XMTPModule : Module() {
throw XMTPException("Create client with enableAlphaMLS true in order to create a group")
}
val group = client.conversations.newGroup(peerAddresses)
logV("id after creating group: " + Base64.encodeToString(group.id, NO_WRAP))
val encodedGroup = GroupWrapper.encode(client, group)
return@AsyncFunction encodedGroup
GroupWrapper.encode(client, group)
}

AsyncFunction("listMemberAddresses") { clientAddress: String, groupId: String ->
Expand All @@ -614,7 +613,7 @@ class XMTPModule : Module() {
throw XMTPException("Create client with enableAlphaMLS true in order to create a group")
}
val group = findGroup(clientAddress, groupId)
return@AsyncFunction group?.memberAddresses()
group?.memberAddresses()
}

AsyncFunction("syncGroups") { clientAddress: String ->
Expand Down Expand Up @@ -663,6 +662,11 @@ class XMTPModule : Module() {
subscribeToConversations(clientAddress = clientAddress)
}

Function("subscribeToGroups") { clientAddress: String ->
logV("subscribeToGroups")
subscribeToGroups(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
Expand All @@ -689,6 +693,11 @@ class XMTPModule : Module() {
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}

Function("unsubscribeFromGroups") { clientAddress: String ->
logV("unsubscribeFromGroups")
subscriptions[getGroupsKey(clientAddress)]?.cancel()
}

Function("unsubscribeFromAllMessages") { clientAddress: String ->
logV("unsubscribeFromAllMessages")
subscriptions[getMessagesKey(clientAddress)]?.cancel()
Expand Down Expand Up @@ -859,6 +868,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToGroups(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getGroupsKey(clientAddress)]?.cancel()
subscriptions[getGroupsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamGroups().collect { group ->
sendEvent(
"group",
mapOf(
"clientAddress" to clientAddress,
"group" to GroupWrapper.encodeToObj(client, group, Base64.encodeToString(group.id, NO_WRAP))
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in group subscription: $e")
subscriptions[getGroupsKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToAllMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

Expand Down Expand Up @@ -918,7 +949,6 @@ class XMTPModule : Module() {
CoroutineScope(Dispatchers.IO).launch {
try {
group.streamDecryptedMessages().collect { message ->
logV("Group Message before encoding" + message.toString())
sendEvent(
"message",
mapOf(
Expand All @@ -942,6 +972,10 @@ class XMTPModule : Module() {
return "conversations:$clientAddress"
}

private fun getGroupsKey(clientAddress: String): String {
return "groups:$clientAddress"
}

private fun unsubscribeFromMessages(
clientAddress: String,
topic: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.xmtp.android.library.Group
class GroupWrapper {

companion object {
private fun encodeToObj(client: Client, group: Group, id: String): Map<String, Any> {
fun encodeToObj(client: Client, group: Group, id: String): Map<String, Any> {
return mapOf(
"clientAddress" to client.address,
"id" to id,
Expand Down
78 changes: 78 additions & 0 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
StaticAttachmentCodec,
RemoteAttachmentCodec,
RemoteAttachmentContent,
Group,
} from '../../src/index'

type EncodedContent = content.EncodedContent
Expand Down Expand Up @@ -490,6 +491,83 @@ test('can remove members from a group', async () => {
return true
})

test('can stream groups', async () => {
// Create three MLS enabled Clients
const aliceClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const bobClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const camClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})

// Start streaming groups
const groups: Group<any>[] = []
const cancelStreamGroups = await aliceClient.conversations.streamGroups(
async (group: Group<any>) => {
groups.push(group)
}
)

// Cam creates a group with Alice, so stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camGroup = await camClient.conversations.newGroup([aliceClient.address])
await delayToPropogate()
if ((groups.length as number) !== 1) {
throw Error('Unexpected num groups (should be 1): ' + groups.length)
}

// Bob creates a group with Alice so a stream callback is fired
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const bobgroup = await bobClient.conversations.newGroup([aliceClient.address])
await delayToPropogate()
if ((groups.length as number) !== 2) {
throw Error('Unexpected num groups (should be 2): ' + groups.length)
}

// * Note Alice creating a group does not trigger alice conversations
// group stream. Workaround is to syncGroups after you create and list manually

// Alice creates a group
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const aliceGroup = await aliceClient.conversations.newGroup([
bobClient.address,
camClient.address,
])
await delayToPropogate()
if (groups.length !== 2) {
throw Error('Expected group length 2 but it is: ' + groups.length)
}
// Sync groups after creation if you created a group
await aliceClient.conversations.syncGroups()
const listedGroups = await aliceClient.conversations.listGroups()
await delayToPropogate()
groups.push(listedGroups[listedGroups.length - 1])
if ((groups.length as number) !== 3) {
throw Error('Expected group length 3 but it is: ' + groups.length)
}

cancelStreamGroups()
await delayToPropogate()

// Creating a group should no longer trigger stream groups
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const camSecond = await camClient.conversations.newGroup([
aliceClient.address,
])
await delayToPropogate()
if ((groups.length as number) !== 3) {
throw Error('Unexpected num groups (should be 3): ' + groups.length)
}

return true
})

test('can pass a custom filter date and receive message objects with expected dates', async () => {
try {
const bob = await Client.createRandom({ env: 'local' })
Expand Down
8 changes: 8 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ export function subscribeToConversations(clientAddress: string) {
return XMTPModule.subscribeToConversations(clientAddress)
}

export function subscribeToGroups(clientAddress: string) {
return XMTPModule.subscribeToGroups(clientAddress)
}

export function subscribeToAllMessages(clientAddress: string) {
return XMTPModule.subscribeToAllMessages(clientAddress)
}
Expand All @@ -424,6 +428,10 @@ export function unsubscribeFromConversations(clientAddress: string) {
return XMTPModule.unsubscribeFromConversations(clientAddress)
}

export function unsubscribeFromGroups(clientAddress: string) {
return XMTPModule.unsubscribeFromGroups(clientAddress)
}

export function unsubscribeFromAllMessages(clientAddress: string) {
return XMTPModule.unsubscribeFromAllMessages(clientAddress)
}
Expand Down
38 changes: 38 additions & 0 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,37 @@ export default class Conversations<ContentTypes> {
return result
}

/**
* This method streams groups that the client is a member of.
*
* @returns {Promise<Group[]>} A Promise that resolves to an array of Group objects.
*/
async streamGroups(
callback: (group: Group<ContentTypes>) => Promise<void>
): Promise<() => void> {
XMTPModule.subscribeToGroups(this.client.address)
const groupsSubscription = XMTPModule.emitter.addListener(
'group',
async ({
clientAddress,
group,
}: {
clientAddress: string
group: Group<ContentTypes>
}) => {
if (this.known[group.id]) {
return
}
this.known[group.id] = true
await callback(new Group(this.client, group))
}
)
return () => {
groupsSubscription.remove()
XMTPModule.unsubscribeFromGroups(this.client.address)
}
}

/**
* Creates a new group.
*
Expand Down Expand Up @@ -166,6 +197,13 @@ export default class Conversations<ContentTypes> {
XMTPModule.unsubscribeFromConversations(this.client.address)
}

/**
* Cancels the stream for new conversations.
*/
cancelStreamGroups() {
XMTPModule.unsubscribeFromGroups(this.client.address)
}

/**
* Cancels the stream for new messages in all conversations.
*/
Expand Down

0 comments on commit 3aeaba1

Please sign in to comment.