Skip to content

Commit

Permalink
Allow for optional callbacks for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rygine committed Jun 13, 2024
1 parent 5c9703d commit 7ad12c1
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 22 deletions.
18 changes: 11 additions & 7 deletions packages/mls-client/src/Conversation.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type {
NapiGroup,
NapiListMessagesOptions,
NapiMessage,
} from '@xmtp/mls-client-bindings-node'
import type { ContentTypeId } from '@xmtp/xmtp-js'
import { AsyncStream } from '@/AsyncStream'
import { AsyncStream, type StreamCallback } from '@/AsyncStream'
import type { Client } from '@/Client'
import { DecodedMessage } from '@/DecodedMessage'
import { nsToDate } from '@/helpers/date'
Expand Down Expand Up @@ -78,12 +77,17 @@ export class Conversation {
return this.#group.sync()
}

stream() {
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#group.stream(asyncStream.callback)
stream(callback?: StreamCallback<DecodedMessage>) {
const asyncStream = new AsyncStream<DecodedMessage>()

const stream = this.#group.stream((err, message) => {
const decodedMessage = new DecodedMessage(this.#client, message)
asyncStream.callback(err, decodedMessage)
callback?.(err, decodedMessage)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}

Expand Down
52 changes: 37 additions & 15 deletions packages/mls-client/src/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import type {
GroupPermissions,
NapiConversations,
NapiGroup,
NapiListMessagesOptions,
NapiMessage,
} from '@xmtp/mls-client-bindings-node'
import { AsyncStream } from '@/AsyncStream'
import { AsyncStream, type StreamCallback } from '@/AsyncStream'
import type { Client } from '@/Client'
import { Conversation } from '@/Conversation'
import { DecodedMessage } from '@/DecodedMessage'

export class Conversations {
#client: Client
#conversations: NapiConversations
#map: Map<string, Conversation>

constructor(client: Client, conversations: NapiConversations) {
this.#client = client
this.#conversations = conversations
this.#map = new Map()
}

get(id: string) {
return this.#map.get(id)
}

async newConversation(
Expand All @@ -27,35 +31,53 @@ export class Conversations {
accountAddresses,
permissions
)
return new Conversation(this.#client, group)
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
return conversation
}

async list(options?: NapiListMessagesOptions) {
const groups = await this.#conversations.list(options)
return groups.map((group) => new Conversation(this.#client, group))
return groups.map((group) => {
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
return conversation
})
}

async sync() {
return this.#conversations.sync()
}

stream() {
const asyncStream = new AsyncStream<NapiGroup, Conversation>(
(group) => new Conversation(this.#client, group)
)
const stream = this.#conversations.stream(asyncStream.callback)
stream(callback?: StreamCallback<Conversation>) {
const asyncStream = new AsyncStream<Conversation>()

const stream = this.#conversations.stream((err, group) => {
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
asyncStream.callback(err, conversation)
callback?.(err, conversation)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}

async streamAllMessages() {
async streamAllMessages(callback?: StreamCallback<DecodedMessage>) {
// sync conversations first
await this.sync()
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#conversations.streamAllMessages(asyncStream.callback)

const asyncStream = new AsyncStream<DecodedMessage>()

const stream = this.#conversations.streamAllMessages((err, message) => {
const decodedMessage = new DecodedMessage(this.#client, message)
asyncStream.callback(err, decodedMessage)
callback?.(err, decodedMessage)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}
}
7 changes: 7 additions & 0 deletions packages/mls-client/test/Conversations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ describe('Conversations', () => {
user2.account.address,
])
expect(conversation).toBeDefined()
expect(client1.conversations.get(conversation.id)?.id).toBe(conversation.id)
expect(conversation.id).toBeDefined()
expect(conversation.createdAt).toBeDefined()
expect(conversation.createdAtNs).toBeDefined()
Expand Down Expand Up @@ -75,6 +76,12 @@ describe('Conversations', () => {
}
}
stream.stop()
expect(client3.conversations.get(conversation1.id)?.id).toBe(
conversation1.id
)
expect(client3.conversations.get(conversation2.id)?.id).toBe(
conversation2.id
)
})

it('should stream all messages', async () => {
Expand Down

0 comments on commit 7ad12c1

Please sign in to comment.