From c516b984ce7be000b8acadc9f9b3deadec409af3 Mon Sep 17 00:00:00 2001 From: Alex Risch Date: Thu, 29 Feb 2024 16:57:02 -0700 Subject: [PATCH] fix: Additional checks on stream callbacks Added checks for ids on JS callbacks Added tests for streams with mutliple clients --- .../modules/xmtpreactnativesdk/XMTPModule.kt | 2 + example/src/tests/groupTests.ts | 305 +++++++++++++++++- example/src/tests/tests.ts | 130 ++++++++ ios/XMTPModule.swift | 2 + src/lib/Conversation.ts | 6 + src/lib/Conversations.ts | 3 + src/lib/Group.ts | 6 + 7 files changed, 452 insertions(+), 2 deletions(-) diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt index 4cd2836bd..73b9cc27d 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt @@ -1093,6 +1093,7 @@ class XMTPModule : Module() { mapOf( "clientAddress" to clientAddress, "message" to DecodedMessageWrapper.encodeMap(message), + "topic" to topic, ) ) } @@ -1119,6 +1120,7 @@ class XMTPModule : Module() { mapOf( "clientAddress" to clientAddress, "message" to DecodedMessageWrapper.encodeMap(message), + "groupId" to id, ) ) } diff --git a/example/src/tests/groupTests.ts b/example/src/tests/groupTests.ts index 53ed00e33..f48332c5d 100644 --- a/example/src/tests/groupTests.ts +++ b/example/src/tests/groupTests.ts @@ -58,7 +58,9 @@ test('can make a MLS V3 client with encryption key and database path', async () await client.conversations.newGroup([anotherClient.address]) assert( (await client.conversations.listGroups()).length === 1, - `should have a group size of 1 but was ${(await client.conversations.listGroups()).length}` + `should have a group size of 1 but was ${ + (await client.conversations.listGroups()).length + }` ) const bundle = await client.exportKeyBundle() @@ -77,7 +79,9 @@ test('can make a MLS V3 client with encryption key and database path', async () assert( (await clientFromBundle.conversations.listGroups()).length === 1, - `should have a group size of 1 but was ${(await clientFromBundle.conversations.listGroups()).length}` + `should have a group size of 1 but was ${ + (await clientFromBundle.conversations.listGroups()).length + }` ) return true }) @@ -932,3 +936,300 @@ test('can stream all group messages', async () => { return true }) + +test('can streamAll from multiple clients', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream alls + const allBoConversations: any[] = [] + const allAliConversations: any[] = [] + + await bo.conversations.streamAll(async (conversation) => { + allBoConversations.push(conversation) + }) + await alix.conversations.streamAll(async (conversation) => { + allAliConversations.push(conversation) + }) + + // Start Caro starts a new conversation. + await caro.conversations.newConversation(alix.address) + await delayToPropogate() + if (allBoConversations.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + + allBoConversations.length + + ' and Alix had ' + + allAliConversations.length + ) + } + if (allAliConversations.length !== 1) { + throw Error( + 'Unexpected all conversations count ' + allAliConversations.length + ) + } + return true +}) + +test('can streamAll from multiple clients - swapped orderring', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream alls + const allBoConversations: any[] = [] + const allAliConversations: any[] = [] + + await alix.conversations.streamAll(async (conversation) => { + allAliConversations.push(conversation) + }) + + await bo.conversations.streamAll(async (conversation) => { + allBoConversations.push(conversation) + }) + + // Start Caro starts a new conversation. + await caro.conversations.newConversation(alix.address) + await delayToPropogate() + if (allBoConversations.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + + allBoConversations.length + + ' and Alix had ' + + allAliConversations.length + ) + } + if (allAliConversations.length !== 1) { + throw Error( + 'Unexpected all conversations count ' + allAliConversations.length + ) + } + return true +}) + +test('can streamAllMessages from multiple clients', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream + const allBoMessages: any[] = [] + const allAliMessages: any[] = [] + + await bo.conversations.streamAllMessages(async (conversation) => { + allBoMessages.push(conversation) + }, true) + await alix.conversations.streamAllMessages(async (conversation) => { + allAliMessages.push(conversation) + }, true) + + // Start Caro starts a new conversation. + const caroConversation = await caro.conversations.newConversation( + alix.address + ) + await caroConversation.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error('Unexpected all messages count for Bo ' + allBoMessages.length) + } + + if (allAliMessages.length !== 1) { + throw Error( + 'Unexpected all conversations count for Ali ' + allAliMessages.length + ) + } + + return true +}) + +test('can streamAllMessages from multiple clients - swapped', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream + const allBoMessages: any[] = [] + const allAliMessages: any[] = [] + const caroGroup = await caro.conversations.newGroup([alix.address]) + + await alix.conversations.streamAllMessages(async (conversation) => { + allAliMessages.push(conversation) + }, true) + await bo.conversations.streamAllMessages(async (conversation) => { + allBoMessages.push(conversation) + }, true) + + // Start Caro starts a new conversation. + const caroConvo = await caro.conversations.newConversation(alix.address) + await delayToPropogate() + await caroConvo.send({ text: `Message` }) + await caroGroup.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + allBoMessages.length + ) + } + + if (allAliMessages.length !== 2) { + throw Error( + 'Unexpected all conversations count for Ali ' + allAliMessages.length + ) + } + + return true +}) + +test('can stream all group Messages from multiple clients', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream + const allAlixMessages: DecodedMessage[] = [] + const allBoMessages: DecodedMessage[] = [] + const alixGroup = await caro.conversations.newGroup([alix.address]) + const boGroup = await caro.conversations.newGroup([bo.address]) + + await alixGroup.streamGroupMessages(async (message) => { + allAlixMessages.push(message) + }) + await boGroup.streamGroupMessages(async (message) => { + allBoMessages.push(message) + }) + + // Start Caro starts a new conversation. + await delayToPropogate() + await alixGroup.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error('Unexpected all messages count for Bo ' + allBoMessages.length) + } + + if (allAlixMessages.length !== 1) { + throw Error( + 'Unexpected all messages count for Ali ' + allAlixMessages.length + ) + } + + const alixConv = (await alix.conversations.listGroups())[0] + await alixConv.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error('Unexpected all messages count for Bo ' + allBoMessages.length) + } + // @ts-ignore-next-line + if (allAlixMessages.length !== 2) { + throw Error( + 'Unexpected all messages count for Ali ' + allAlixMessages.length + ) + } + + return true +}) + +test('can stream all group Messages from multiple clients - swapped', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Setup stream + const allAlixMessages: DecodedMessage[] = [] + const allBoMessages: DecodedMessage[] = [] + const alixGroup = await caro.conversations.newGroup([alix.address]) + const boGroup = await caro.conversations.newGroup([bo.address]) + + await boGroup.streamGroupMessages(async (message) => { + allBoMessages.push(message) + }) + await alixGroup.streamGroupMessages(async (message) => { + allAlixMessages.push(message) + }) + + // Start Caro starts a new conversation. + await delayToPropogate() + await alixGroup.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error('Unexpected all messages count for Bo ' + allBoMessages.length) + } + + if (allAlixMessages.length !== 1) { + throw Error( + 'Unexpected all messages count for Ali ' + allAlixMessages.length + ) + } + + const alixConv = (await alix.conversations.listGroups())[0] + await alixConv.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error('Unexpected all messages count for Bo ' + allBoMessages.length) + } + // @ts-ignore-next-line + if (allAlixMessages.length !== 2) { + throw Error( + 'Unexpected all messages count for Ali ' + allAlixMessages.length + ) + } + + return true +}) + +// Commenting this out so it doesn't block people, but nice to have? +// test('can stream messages for a long time', async () => { +// const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) +// await delayToPropogate() +// const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) +// await delayToPropogate() +// const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) +// await delayToPropogate() + +// // Setup stream alls +// const allBoMessages: any[] = [] +// const allAliMessages: any[] = [] + +// const group = await caro.conversations.newGroup([alix.address]) +// await bo.conversations.streamAllMessages(async (conversation) => { +// allBoMessages.push(conversation) +// }, true) +// await alix.conversations.streamAllMessages(async (conversation) => { +// allAliMessages.push(conversation) +// }, true) + +// // Wait for 15 minutes +// await delayToPropogate(15 * 1000 * 60) + +// // Start Caro starts a new conversation. +// const convo = await caro.conversations.newConversation(alix.address) +// await group.send({ text: 'hello' }) +// await convo.send({ text: 'hello' }) +// await delayToPropogate() +// if (allBoMessages.length !== 0) { +// throw Error('Unexpected all conversations count ' + allBoMessages.length) +// } +// if (allAliMessages.length !== 2) { +// throw Error('Unexpected all conversations count ' + allAliMessages.length) +// } + +// return true +// }) diff --git a/example/src/tests/tests.ts b/example/src/tests/tests.ts index 89fd7f1cf..f1a1448eb 100644 --- a/example/src/tests/tests.ts +++ b/example/src/tests/tests.ts @@ -929,3 +929,133 @@ test('instantiate frames client correctly', async () => { } return true }) + +// Skipping this test as it's not something supported right now +test('can stream all conversation Messages from multiple clients', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + if (bo.address === alix.address) { + throw Error('Bo and Alix should have different addresses') + } + if (bo.address === caro.address) { + throw Error('Bo and Caro should have different addresses') + } + if (alix.address === caro.address) { + throw Error('Alix and Caro should have different addresses') + } + + // Setup stream + const allAlixMessages: DecodedMessage[] = [] + const allBoMessages: DecodedMessage[] = [] + const alixConvo = await caro.conversations.newConversation(alix.address) + const boConvo = await caro.conversations.newConversation(bo.address) + + await alixConvo.streamMessages(async (message) => { + allAlixMessages.push(message) + }) + await boConvo.streamMessages(async (message) => { + allBoMessages.push(message) + }) + + // Start Caro starts a new conversation. + await delayToPropogate() + await alixConvo.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + allBoMessages.length + ) + } + + if (allAlixMessages.length !== 1) { + throw Error( + 'Unexpected all conversations count for Alix ' + allAlixMessages.length + ) + } + + const alixConv = (await alix.conversations.list())[0] + await alixConv.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + allBoMessages.length + ) + } + // @ts-ignore-next-line + if (allAlixMessages.length !== 2) { + throw Error( + 'Unexpected all conversations count for Alix ' + allAlixMessages.length + ) + } + + return true +}) + +test('can stream all conversation Messages from multiple clients - swapped', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + if (bo.address === alix.address) { + throw Error('Bo and Alix should have different addresses') + } + if (bo.address === caro.address) { + throw Error('Bo and Caro should have different addresses') + } + if (alix.address === caro.address) { + throw Error('Alix and Caro should have different addresses') + } + + // Setup stream + const allAlixMessages: DecodedMessage[] = [] + const allBoMessages: DecodedMessage[] = [] + const alixConvo = await caro.conversations.newConversation(alix.address) + const boConvo = await caro.conversations.newConversation(bo.address) + + await boConvo.streamMessages(async (message) => { + allBoMessages.push(message) + }) + await alixConvo.streamMessages(async (message) => { + allAlixMessages.push(message) + }) + + // Start Caro starts a new conversation. + await delayToPropogate() + await alixConvo.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + allBoMessages.length + ) + } + + if (allAlixMessages.length !== 1) { + throw Error( + 'Unexpected all conversations count for Alix ' + allAlixMessages.length + ) + } + + const alixConv = (await alix.conversations.list())[0] + await alixConv.send({ text: `Message` }) + await delayToPropogate() + if (allBoMessages.length !== 0) { + throw Error( + 'Unexpected all conversations count for Bo ' + allBoMessages.length + ) + } + // @ts-ignore-next-line + if (allAlixMessages.length !== 2) { + throw Error( + 'Unexpected all conversations count for Alix ' + allAlixMessages.length + ) + } + + return true +}) + diff --git a/ios/XMTPModule.swift b/ios/XMTPModule.swift index 3fdbfc087..d7aabfa1b 100644 --- a/ios/XMTPModule.swift +++ b/ios/XMTPModule.swift @@ -1011,6 +1011,7 @@ public class XMTPModule: Module { try sendEvent("conversationMessage", [ "clientAddress": clientAddress, "message": DecodedMessageWrapper.encodeToObj(message, client: client), + "topic": topic ]) } catch { print("discarding message, unable to encode wrapper \(message.id)") @@ -1081,6 +1082,7 @@ public class XMTPModule: Module { try sendEvent("groupMessage", [ "clientAddress": clientAddress, "message": DecodedMessageWrapper.encodeToObj(message, client: client), + "groupId": id, ]) } catch { print("discarding message, unable to encode wrapper \(message.id)") diff --git a/src/lib/Conversation.ts b/src/lib/Conversation.ts index 2fc79c6b9..1fbad59fb 100644 --- a/src/lib/Conversation.ts +++ b/src/lib/Conversation.ts @@ -279,13 +279,19 @@ export class Conversation async ({ clientAddress, message, + topic, }: { clientAddress: string message: DecodedMessage + topic: string }) => { + // Long term these checks should be able to be done on the native layer as well, but additional checks in JS for safety if (clientAddress !== this.client.address) { return } + if (topic !== this.topic) { + return + } if (hasSeen[message.id]) { return } diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index f7a47e152..10c06f93d 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -214,6 +214,9 @@ export default class Conversations< clientAddress: string conversationContainer: ConversationContainer }) => { + if (clientAddress !== this.client.address) { + return + } if (this.known[conversationContainer.topic]) { return } diff --git a/src/lib/Group.ts b/src/lib/Group.ts index 4fc816876..811d63c94 100644 --- a/src/lib/Group.ts +++ b/src/lib/Group.ts @@ -133,13 +133,19 @@ export class Group< async ({ clientAddress, message, + groupId, }: { clientAddress: string message: DecodedMessage + groupId: string }) => { + // Long term these checks should be able to be done on the native layer as well, but additional checks in JS for safety if (clientAddress !== this.client.address) { return } + if (groupId !== this.id) { + return + } if (hasSeen[message.id]) { return }