Skip to content

Commit

Permalink
Merge pull request #244 from xmtp/cv/groups-streaming
Browse files Browse the repository at this point in the history
Groups streaming (android)
  • Loading branch information
cameronvoell authored Feb 9, 2024
2 parents d12fbd9 + 7865000 commit 12eacf9
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,14 @@ class XMTPModule : Module() {
)
}

AsyncFunction("sendMessageToGroup") { clientAddress: String, idString: String, contentJson: String ->
AsyncFunction("sendMessageToGroup") { clientAddress: String, id: String, contentJson: String ->
logV("sendMessageToGroup")
val group =
findGroup(
clientAddress = clientAddress,
idString = idString
id = id
)
?: throw XMTPException("no group found for $idString")
?: throw XMTPException("no group found for $id")
val sending = ContentJson.fromJson(contentJson)
group.send(
content = sending.content,
Expand Down Expand Up @@ -676,6 +676,14 @@ class XMTPModule : Module() {
)
}

AsyncFunction("subscribeToGroupMessages") { clientAddress: String, id: String ->
logV("subscribeToGroupMessages")
subscribeToGroupMessages(
clientAddress = clientAddress,
id = id
)
}

Function("unsubscribeFromConversations") { clientAddress: String ->
logV("unsubscribeFromConversations")
subscriptions[getConversationsKey(clientAddress)]?.cancel()
Expand All @@ -694,6 +702,14 @@ class XMTPModule : Module() {
)
}

AsyncFunction("unsubscribeFromGroupMessages") { clientAddress: String, id: String ->
logV("unsubscribeFromGroupMessages")
unsubscribeFromGroupMessages(
clientAddress = clientAddress,
id = id
)
}

Function("registerPushToken") { pushServer: String, token: String ->
logV("registerPushToken")
xmtpPush = XMTPPush(appContext.reactContext!!, pushServer)
Expand Down Expand Up @@ -802,17 +818,17 @@ class XMTPModule : Module() {

private fun findGroup(
clientAddress: String,
idString: String,
id: String,
): Group? {
val client = clients[clientAddress] ?: throw XMTPException("No client")

val cacheKey = "${clientAddress}:${idString}"
val cacheKey = "${clientAddress}:${id}"
val cacheGroup = groups[cacheKey]
if (cacheGroup != null) {
return cacheGroup
} else {
val group = client.conversations.listGroups()
.firstOrNull { Base64.encodeToString(it.id, NO_WRAP) == idString }
.firstOrNull { Base64.encodeToString(it.id, NO_WRAP) == id }
if (group != null) {
groups[group.cacheKey(clientAddress)] = group
return group
Expand Down Expand Up @@ -891,6 +907,33 @@ class XMTPModule : Module() {
}
}

private fun subscribeToGroupMessages(clientAddress: String, id: String) {
val group =
findGroup(
clientAddress = clientAddress,
id = id
) ?: return
subscriptions[group.cacheKey(clientAddress)]?.cancel()
subscriptions[group.cacheKey(clientAddress)] =
CoroutineScope(Dispatchers.IO).launch {
try {
group.streamDecryptedMessages().collect { message ->
logV("Group Message before encoding" + message.toString())
sendEvent(
"message",
mapOf(
"clientAddress" to clientAddress,
"message" to DecodedMessageWrapper.encodeMap(message),
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in messages subscription: $e")
subscriptions[group.cacheKey(clientAddress)]?.cancel()
}
}
}

private fun getMessagesKey(clientAddress: String): String {
return "messages:$clientAddress"
}
Expand All @@ -911,6 +954,18 @@ class XMTPModule : Module() {
subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
}

private fun unsubscribeFromGroupMessages(
clientAddress: String,
id: String,
) {
val conversation =
findGroup(
clientAddress = clientAddress,
id = id
) ?: return
subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
}

private fun logV(msg: String) {
if (isDebugEnabled) {
Log.v("XMTPModule", msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.xmtp.android.library.Group
class GroupWrapper {

companion object {
private fun encodeToObj(client: Client, group: Group, idString: String): Map<String, Any> {
private fun encodeToObj(client: Client, group: Group, id: String): Map<String, Any> {
return mapOf(
"clientAddress" to client.address,
"id" to idString,
"id" to id,
"createdAt" to group.createdAt.time,
"peerAddresses" to group.memberAddresses(),

Expand Down
85 changes: 74 additions & 11 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function assert(condition: boolean, msg: string) {
}
}

function delayToPropogate(): Promise<void> {
async function delayToPropogate(): Promise<void> {
// delay 1s to avoid clobbering
return new Promise((r) => setTimeout(r, 100))
}
Expand All @@ -99,6 +99,7 @@ test('can make a client', async () => {
})

test('can make a MLS V3 client', async () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const client = await Client.createRandom({
env: 'local',
appVersion: 'Testing/0.0.0',
Expand Down Expand Up @@ -161,6 +162,7 @@ test('can make a MLS V3 client from bundle', async () => {

test('production MLS V3 client creation throws error', async () => {
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const client = await Client.createRandom({
env: 'production',
appVersion: 'Testing/0.0.0',
Expand Down Expand Up @@ -193,7 +195,7 @@ test('can message in a group', async () => {

// Alice's num groups start at 0
let aliceGroups = await aliceClient.conversations.listGroups()
if (aliceGroups.length != 0) {
if (aliceGroups.length !== 0) {
throw new Error('num groups should be 0')
}

Expand All @@ -205,13 +207,13 @@ test('can message in a group', async () => {

// Alice's num groups == 1
aliceGroups = await aliceClient.conversations.listGroups()
if (aliceGroups.length != 1) {
if (aliceGroups.length !== 1) {
throw new Error('num groups should be 1')
}

// Alice can confirm memberAddresses
const memberAddresses = await aliceGroup.memberAddresses()
if (memberAddresses.length != 3) {
if (memberAddresses.length !== 3) {
throw new Error('num group members should be 3')
}
const lowercasedAddresses: string[] = memberAddresses.map((s) =>
Expand All @@ -234,7 +236,7 @@ test('can message in a group', async () => {
// Bob's num groups == 1
await bobClient.conversations.syncGroups()
const bobGroups = await bobClient.conversations.listGroups()
if (bobGroups.length != 1) {
if (bobGroups.length !== 1) {
throw new Error(
'num groups for bob should be 1, but it is' + bobGroups.length
)
Expand All @@ -243,15 +245,15 @@ test('can message in a group', async () => {
// Bob can read messages from Alice
await bobGroups[0].sync()
const bobMessages: DecodedMessage[] = await bobGroups[0].messages()
if (bobMessages.length != 2) {
if (bobMessages.length !== 2) {
throw new Error(
'num messages for bob should be 2, but it is' + bobMessages.length
)
}
if (bobMessages[0].content() != 'gm') {
if (bobMessages[0].content() !== 'gm') {
throw new Error("newest message should be 'gm'")
}
if (bobMessages[1].content() != 'hello, world') {
if (bobMessages[1].content() !== 'hello, world') {
throw new Error("newest message should be 'hello, world'")
}
// Bob can send a message
Expand All @@ -260,7 +262,7 @@ test('can message in a group', async () => {
// Cam's num groups == 1
await camClient.conversations.syncGroups()
const camGroups = await camClient.conversations.listGroups()
if (camGroups.length != 1) {
if (camGroups.length !== 1) {
throw new Error(
'num groups for cam should be 1, but it is' + camGroups.length
)
Expand All @@ -269,10 +271,10 @@ test('can message in a group', async () => {
// Cam can read messages from Alice and Bob
await camGroups[0].sync()
const camMessages = await camGroups[0].messages()
if (camMessages[1].content() != 'gm') {
if (camMessages[1].content() !== 'gm') {
throw new Error("second Message should be 'gm'")
}
if (camMessages[0].content() != 'hey guys!') {
if (camMessages[0].content() !== 'hey guys!') {
throw new Error("newest Message should be 'hey guys!'")
}

Expand Down Expand Up @@ -860,6 +862,67 @@ test('can stream messages', async () => {
return true
})

test('can stream group messages', async () => {
// Create three MLS enabled Clients
const aliceClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const bobClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})
const camClient = await Client.createRandom({
env: 'local',
enableAlphaMls: true,
})

// Alice creates a group
const aliceGroup = await aliceClient.conversations.newGroup([
bobClient.address,
camClient.address,
])

// Record message stream for this group
const groupMessages: DecodedMessage[] = []
const cancelGroupMessageStream = await aliceGroup.streamGroupMessages(
async (message) => {
groupMessages.push(message)
}
)

// Bob's num groups == 1
await bobClient.conversations.syncGroups()
const bobGroup = (await bobClient.conversations.listGroups())[0]

for (let i = 0; i < 5; i++) {
await bobGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}

if (groupMessages.length !== 5) {
throw Error('Unexpected convo messages count ' + groupMessages.length)
}
for (let i = 0; i < 5; i++) {
if (groupMessages[i].content() !== `Message ${i}`) {
throw Error(
'Unexpected group message content ' + groupMessages[i].content()
)
}
}

cancelGroupMessageStream()
for (let i = 0; i < 5; i++) {
await bobGroup.send({ text: `Message ${i}` })
}

if (groupMessages.length !== 5) {
throw Error('Unexpected convo messages count ' + groupMessages.length)
}

return true
})

test('remote attachments should work', async () => {
const alice = await Client.createRandom({
env: 'local',
Expand Down
14 changes: 14 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ export async function syncGroup(clientAddress: string, id: string) {
await XMTPModule.syncGroup(clientAddress, id)
}

export async function subscribeToGroupMessages(
clientAddress: string,
id: string
) {
return await XMTPModule.subscribeToGroupMessages(clientAddress, id)
}

export async function unsubscribeFromGroupMessages(
clientAddress: string,
id: string
) {
return await XMTPModule.unsubscribeFromGroupMessages(clientAddress, id)
}

export async function addGroupMembers(
clientAddress: string,
id: string,
Expand Down
43 changes: 43 additions & 0 deletions src/lib/Group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,49 @@ export class Group<ContentTypes> {
await XMTP.syncGroup(this.client.address, this.id)
}

/**
* Sets up a real-time message stream for the current group.
*
* This method subscribes to incoming messages in real-time and listens for new message events.
* When a new message is detected, the provided callback function is invoked with the details of the message.
* Additionally, this method returns a function that can be called to unsubscribe and end the message stream.
*
* @param {Function} callback - A callback function that will be invoked with the new DecodedMessage when a message is received.
* @returns {Function} A function that, when called, unsubscribes from the message stream and ends real-time updates.
*/
streamGroupMessages(
callback: (message: DecodedMessage) => Promise<void>
): () => void {
XMTP.subscribeToGroupMessages(this.client.address, this.id)
const hasSeen = {}
const messageSubscription = XMTP.emitter.addListener(
'message',
async ({
clientAddress,
message,
}: {
clientAddress: string
message: DecodedMessage
}) => {
if (clientAddress !== this.client.address) {
return
}
if (hasSeen[message.id]) {
return
}

hasSeen[message.id] = true

message.client = this.client
await callback(DecodedMessage.fromObject(message, this.client))
}
)
return () => {
messageSubscription.remove()
XMTP.unsubscribeFromGroupMessages(this.client.address, this.id)
}
}

async addMembers(addresses: string[]): Promise<void> {
return XMTP.addGroupMembers(this.client.address, this.id, addresses)
}
Expand Down

0 comments on commit 12eacf9

Please sign in to comment.