Skip to content

Commit

Permalink
add the conversations methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Oct 25, 2024
1 parent dec7860 commit f4b10aa
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 8 deletions.
16 changes: 8 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1048,12 +1048,12 @@ export async function unsubscribeFromMessages(inboxId: string, topic: string) {
return await XMTPModule.unsubscribeFromMessages(inboxId, topic)
}

export async function subscribeToV3Conversations(inboxId: string) {
return await XMTPModule.subscribeToV3Conversations(inboxId)
export function subscribeToV3Conversations(inboxId: string) {
return XMTPModule.subscribeToV3Conversations(inboxId)
}

export async function subscribeToAllConversationMessages(inboxId: string) {
return await XMTPModule.subscribeToAllConversationMessages(inboxId)
export function subscribeToAllConversationMessages(inboxId: string) {
return XMTPModule.subscribeToAllConversationMessages(inboxId)
}

export async function subscribeToConversationMessages(
Expand All @@ -1063,12 +1063,12 @@ export async function subscribeToConversationMessages(
return await XMTPModule.subscribeToConversationMessages(inboxId, id)
}

export async function unsubscribeFromAllConversationMessages(inboxId: string) {
return await XMTPModule.unsubscribeFromAllConversationMessages(inboxId)
export function unsubscribeFromAllConversationMessages(inboxId: string) {
return XMTPModule.unsubscribeFromAllConversationMessages(inboxId)
}

export async function unsubscribeFromV3Conversations(inboxId: string) {
return await XMTPModule.unsubscribeFromV3Conversations(inboxId)
export function unsubscribeFromV3Conversations(inboxId: string) {
return XMTPModule.unsubscribeFromV3Conversations(inboxId)
}

export async function unsubscribeFromConversationMessages(
Expand Down
173 changes: 173 additions & 0 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { ConversationContext } from '../XMTP.types'
import * as XMTPModule from '../index'
import { ContentCodec } from '../index'
import { getAddress } from '../utils/address'
import { Dm } from './Dm'

export default class Conversations<
ContentTypes extends ContentCodec<any>[] = [],
Expand Down Expand Up @@ -81,6 +82,18 @@ export default class Conversations<
)
}

/**
* Creates a new V3 conversation.
*
* This method creates a new conversation with the specified peer address.
*
* @param {string} peerAddress - The address of the peer to create a conversation with.
* @returns {Promise<Dm>} A Promise that resolves to a Dm object.
*/
async findOrCreateDm(peerAddress: string): Promise<Dm<ContentTypes>> {
return await XMTPModule.findOrCreateDm(this.client, peerAddress)
}

/**
* This method returns a list of all groups that the client is a member of.
* To get the latest list of groups from the network, call syncGroups() first.
Expand Down Expand Up @@ -114,6 +127,40 @@ export default class Conversations<
return await XMTPModule.findGroup(this.client, groupId)
}

/**
* This method returns a Dm by the address if that dm exists in the local database.
* To get the latest list of groups from the network, call syncConversations() first.
*
* @returns {Promise<Dm>} A Promise that resolves to a Group or undefined if not found.
*/
async findDm(address: string): Promise<Dm<ContentTypes> | undefined> {
return await XMTPModule.findDm(this.client, address)
}

/**
* This method returns a conversation by the topic if that conversation exists in the local database.
* To get the latest list of groups from the network, call syncConversations() first.
*
* @returns {Promise<ConversationContainer>} A Promise that resolves to a Group or undefined if not found.
*/
async findConversationByTopic(
topic: string
): Promise<ConversationContainer<ContentTypes> | undefined> {
return await XMTPModule.findConversationByTopic(this.client, topic)
}

/**
* This method returns a conversation by the conversation id if that conversation exists in the local database.
* To get the latest list of groups from the network, call syncConversations() first.
*
* @returns {Promise<ConversationContainer>} A Promise that resolves to a Group or undefined if not found.
*/
async findConversation(
conversationId: string
): Promise<ConversationContainer<ContentTypes> | undefined> {
return await XMTPModule.findConversation(this.client, conversationId)
}

/**
* This method returns a message by the message id if that message exists in the local database.
* To get the latest list of messages from the network, call syncGroups() first.
Expand Down Expand Up @@ -142,6 +189,16 @@ export default class Conversations<
return result
}

/**
* This method returns a list of all V3 conversations that the client is a member of.
* To include the latest groups from the network in the returned list, call syncGroups() first.
*
* @returns {Promise<ConversationContainer[]>} A Promise that resolves to an array of ConversationContainer objects.
*/
async listConversations(): Promise<ConversationContainer<ContentTypes>[]> {
return await XMTPModule.listConversations(this.client)
}

/**
* This method streams groups that the client is a member of.
*
Expand Down Expand Up @@ -171,6 +228,58 @@ export default class Conversations<
}
}

/**
* This method streams V3 conversations that the client is a member of.
*
* @returns {Promise<ConversationContainer[]>} A Promise that resolves to an array of ConversationContainer objects.
*/
async streamConversations(
callback: (
conversation: ConversationContainer<ContentTypes>
) => Promise<void>
): Promise<() => void> {
XMTPModule.subscribeToV3Conversations(this.client.inboxId)
const subscription = XMTPModule.emitter.addListener(
EventTypes.ConversationV3,
async ({
inboxId,
conversation,
}: {
inboxId: string
conversation: ConversationContainer<ContentTypes>
}) => {
if (inboxId !== this.client.inboxId) {
return
}

this.known[conversation.topic] = true
if (conversation.version === ConversationVersion.GROUP) {
const members = conversation['members'].map((mem: string) => {
return Member.from(mem)
})
return await callback(
new Group(
this.client,
conversation as unknown as GroupParams,
members
)
)
} else if (conversation.version === ConversationVersion.DM) {
const members = conversation['members'].map((mem: string) => {
return Member.from(mem)
})
return await callback(
new Dm(this.client, conversation as unknown as GroupParams, members)
)
}
}
)
return () => {
subscription.remove()
XMTPModule.unsubscribeFromV3Conversations(this.client.inboxId)
}
}

/**
* Creates a new group.
*
Expand Down Expand Up @@ -413,6 +522,40 @@ export default class Conversations<
this.subscriptions[EventTypes.AllGroupMessage] = subscription
}

/**
* Listen for new messages in all v3 conversations.
*
* This method subscribes to all groups in real-time and listens for incoming and outgoing messages.
* @param {Function} callback - A callback function that will be invoked when a message is sent or received.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
*/
async streamAllConversationMessages(
callback: (message: DecodedMessage<ContentTypes>) => Promise<void>
): Promise<void> {
XMTPModule.subscribeToAllConversationMessages(this.client.inboxId)
const subscription = XMTPModule.emitter.addListener(
EventTypes.AllConversationMessage,
async ({
inboxId,
message,
}: {
inboxId: string
message: DecodedMessage
}) => {
if (inboxId !== this.client.inboxId) {
return
}
if (this.known[message.id]) {
return
}

this.known[message.id] = true
await callback(DecodedMessage.fromObject(message, this.client))
}
)
this.subscriptions[EventTypes.AllConversationMessage] = subscription
}

async fromWelcome(encryptedMessage: string): Promise<Group<ContentTypes>> {
try {
return await XMTPModule.processWelcomeMessage(
Expand All @@ -425,6 +568,20 @@ export default class Conversations<
}
}

async conversationFromWelcome(
encryptedMessage: string
): Promise<ConversationContainer<ContentTypes>> {
try {
return await XMTPModule.processConversationWelcomeMessage(
this.client,
encryptedMessage
)
} catch (e) {
console.info('ERROR in processWelcomeMessage()', e)
throw e
}
}

/**
* Cancels the stream for new conversations.
*/
Expand All @@ -447,6 +604,14 @@ export default class Conversations<
XMTPModule.unsubscribeFromGroups(this.client.inboxId)
}

cancelStreamConversations() {
if (this.subscriptions[EventTypes.ConversationV3]) {
this.subscriptions[EventTypes.ConversationV3].remove()
delete this.subscriptions[EventTypes.ConversationV3]
}
XMTPModule.unsubscribeFromV3Conversations(this.client.inboxId)
}

/**
* Cancels the stream for new messages in all conversations.
*/
Expand All @@ -468,4 +633,12 @@ export default class Conversations<
}
XMTPModule.unsubscribeFromAllGroupMessages(this.client.inboxId)
}

cancelStreamAllConversations() {
if (this.subscriptions[EventTypes.AllConversationMessage]) {
this.subscriptions[EventTypes.AllConversationMessage].remove()
delete this.subscriptions[EventTypes.AllConversationMessage]
}
XMTPModule.unsubscribeFromAllConversationMessages(this.client.inboxId)
}
}

0 comments on commit f4b10aa

Please sign in to comment.