diff --git a/src/index.ts b/src/index.ts index 0f31ff4ff..e9b4721fd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -506,8 +506,15 @@ export function subscribeToGroups(clientAddress: string) { return XMTPModule.subscribeToGroups(clientAddress) } -export function subscribeToAllMessages(clientAddress: string) { - return XMTPModule.subscribeToAllMessages(clientAddress) +export function subscribeToAllMessages( + clientAddress: string, + includeGroups: boolean +) { + return XMTPModule.subscribeToAllMessages(clientAddress, includeGroups) +} + +export function subscribeToAllGroupMessages(clientAddress: string) { + return XMTPModule.subscribeToAllGroupMessages(clientAddress) } export async function subscribeToMessages( @@ -529,6 +536,10 @@ export function unsubscribeFromAllMessages(clientAddress: string) { return XMTPModule.unsubscribeFromAllMessages(clientAddress) } +export function unsubscribeFromAllGroupMessages(clientAddress: string) { + return XMTPModule.unsubscribeFromAllGroupMessages(clientAddress) +} + export async function unsubscribeFromMessages( clientAddress: string, topic: string diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 2c04a3de1..77337cc44 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -242,13 +242,48 @@ export default class Conversations< * Listen for new messages in all conversations. * * This method subscribes to all conversations in real-time and listens for incoming and outgoing messages. + * @param {boolean} includeGroups - Whether or not to include group messages in the stream. * @param {Function} callback - A callback function that will be invoked when a message is sent or received. * @returns {Promise} A Promise that resolves when the stream is set up. */ async streamAllMessages( + includeGroups: boolean = false, callback: (message: DecodedMessage) => Promise ): Promise { - XMTPModule.subscribeToAllMessages(this.client.address) + XMTPModule.subscribeToAllMessages(this.client.address, includeGroups) + XMTPModule.emitter.addListener( + 'message', + async ({ + clientAddress, + message, + }: { + clientAddress: string + message: DecodedMessage + }) => { + if (clientAddress !== this.client.address) { + return + } + if (this.known[message.id]) { + return + } + + this.known[message.id] = true + await callback(DecodedMessage.fromObject(message, this.client)) + } + ) + } + + /** + * Listen for new messages in all groups. + * + * This method subscribes to all groups in real-time and listens for incoming and outgoing messages. + * @param {Function} callback - A callback function that will be invoked when a message is sent or received. + * @returns {Promise} A Promise that resolves when the stream is set up. + */ + async streamAllGroupMessages( + callback: (message: DecodedMessage) => Promise + ): Promise { + XMTPModule.subscribeToAllGroupMessages(this.client.address) XMTPModule.emitter.addListener( 'message', async ({ @@ -291,4 +326,11 @@ export default class Conversations< cancelStreamAllMessages() { XMTPModule.unsubscribeFromAllMessages(this.client.address) } + + /** + * Cancels the stream for new messages in all groups. + */ + cancelStreamAllGroupMessages() { + XMTPModule.unsubscribeFromAllGroupMessages(this.client.address) + } }