Skip to content

Commit

Permalink
Merge pull request #191 from xmtp/stream-all-messages
Browse files Browse the repository at this point in the history
streamAllMessages Fixes
  • Loading branch information
neekolas authored Oct 19, 2022
2 parents b042ef0 + 343112a commit a8fafe8
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ export class MessageV2 extends MessageBase implements proto.MessageV2 {
header: proto.MessageHeaderV2,
signed: proto.SignedContent,
// wallet address derived from the signature of the message sender
senderAddress: string | undefined
senderAddress: string
) {
super(id, bytes, obj)
this.decrypted = signed.payload
Expand Down
12 changes: 9 additions & 3 deletions src/conversations/Conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { messageApi, xmtpEnvelope, fetcher } from '@xmtp/proto'
import { encrypt, decrypt, SignedPublicKey, Signature } from '../crypto'
import Ciphertext from '../crypto/Ciphertext'
import { sha256 } from '../crypto/encryption'
import { dateToNs, nsToDate } from '../utils'
import { buildDirectMessageTopic, dateToNs, nsToDate } from '../utils'
const { b64Decode } = fetcher

/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
Expand All @@ -41,6 +41,10 @@ export class ConversationV1 {
return this.client.listConversationMessages(this.peerAddress, opts)
}

get topic(): string {
return buildDirectMessageTopic(this.peerAddress, this.client.address)
}

messagesPaginated(
opts?: ListMessagesPaginatedOptions
): AsyncGenerator<MessageV1[]> {
Expand Down Expand Up @@ -180,8 +184,8 @@ export class ConversationV2 {
return msg
}

private async decodeMessage(env: messageApi.Envelope): Promise<MessageV2> {
if (!env.message) {
async decodeMessage(env: messageApi.Envelope): Promise<MessageV2> {
if (!env.message || !env.contentTopic) {
throw new Error('empty envelope')
}
const messageBytes = b64Decode(env.message.toString())
Expand Down Expand Up @@ -210,6 +214,7 @@ export class ConversationV2 {
) {
throw new Error('incomplete signed content')
}

const digest = await sha256(concat(msgv2.headerBytes, signed.payload))
if (
!new SignedPublicKey(signed.sender?.preKey).verify(
Expand All @@ -220,6 +225,7 @@ export class ConversationV2 {
throw new Error('invalid signature')
}
const message = await MessageV2.create(msg, header, signed, messageBytes)
message.contentTopic = env.contentTopic
await this.client.decodeContent(message)
return message
}
Expand Down
134 changes: 99 additions & 35 deletions src/conversations/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SignedPublicKeyBundle } from './../crypto/PublicKeyBundle'
import { InvitationContext } from './../Invitation'
import { Conversation, ConversationV1, ConversationV2 } from './Conversation'
import { MessageV1 } from '../Message'
import { Message, MessageV1, MessageV2 } from '../Message'
import Stream from '../Stream'
import Client from '../Client'
import {
Expand Down Expand Up @@ -98,55 +98,119 @@ export default class Conversations {
}

/**
* Returns a stream for all new messages from existing and new conversations.
* Streams messages from all conversations.
*
* When a new conversation is initiated with the client's address, this function will automatically register it and add it to the list of conversations to watch.
* Callers should be aware the first messages in a newly created conversation are picked up on a best effort basis and there are other potential race conditions which may cause some newly created conversations to be missed.
*
*/
async streamAllMessages(): Promise<Stream<MessageV1>> {
const conversations = await this.list()
const dmAddresses: string[] = []
for (const conversation of conversations) {
dmAddresses.push(conversation.peerAddress)
}
async streamAllMessages(): Promise<AsyncGenerator<Message>> {
const introTopic = buildUserIntroTopic(this.client.address)
const topics = this.buildTopicsForAllMessages(dmAddresses, introTopic)
const inviteTopic = buildUserInviteTopic(this.client.address)
const topics = new Set<string>([introTopic, inviteTopic])
const convoMap = new Map<string, Conversation>()

for (const conversation of await this.list()) {
topics.add(conversation.topic)
convoMap.set(conversation.topic, conversation)
}

const decodeMessage = async (
env: messageApi.Envelope
): Promise<Conversation | Message | null> => {
const contentTopic = env.contentTopic
if (!contentTopic) {
return null
}

if (contentTopic === introTopic) {
const msg = await this.client.decodeEnvelope(env)
if (!messageHasHeaders(msg)) {
return null
}

// If we detect a new intro topic, update the stream's direct message topics to include the new topic
const contentTopicUpdater = (msg: MessageV1): string[] | undefined => {
if (msg.contentTopic !== introTopic || !messageHasHeaders(msg)) {
return undefined
return msg
}
const peerAddress = this.getPeerAddress(msg)
if (
dmAddresses.includes(peerAddress) ||
peerAddress === this.client.address
) {
// No need to update if we're already subscribed
return undefined

// Decode as an invite and return the envelope
// This gives the contentTopicUpdater everything it needs to add to the topic list
if (contentTopic === inviteTopic) {
const sealed = await SealedInvitation.fromEnvelope(env)
const unsealed = await sealed.v1.getInvitation(this.client.keys)
return ConversationV2.create(this.client, unsealed, sealed.v1.header)
}
dmAddresses.push(peerAddress)
return this.buildTopicsForAllMessages(dmAddresses, introTopic)

const convo = convoMap.get(contentTopic)

// Decode as a V1 message if the topic matches a V1 convo
if (convo instanceof ConversationV1) {
return this.client.decodeEnvelope(env)
}

// Decode as a V2 message if the topic matches a V2 convo
if (convo instanceof ConversationV2) {
return convo.decodeMessage(env)
}

console.log('Unknown topic')

throw new Error('Unknown topic')
}

// Filter intro topics if already streaming direct messages for that address to avoid duplicates
const filter = (msg: MessageV1): boolean => {
if (
msg.contentTopic === introTopic &&
messageHasHeaders(msg) &&
dmAddresses.includes(this.getPeerAddress(msg))
) {
const addConvo = (topic: string, conversation: Conversation): boolean => {
if (topics.has(topic)) {
return false
}
convoMap.set(topic, conversation)
topics.add(topic)
return true
}

return Stream.create<MessageV1>(
const contentTopicUpdater = (msg: Conversation | Message | null) => {
// If we have a V1 message from the introTopic, store the conversation in our mapping
if (msg instanceof MessageV1 && msg.contentTopic === introTopic) {
const convo = new ConversationV1(
this.client,
msg.recipientAddress === this.client.address
? (msg.senderAddress as string)
: (msg.recipientAddress as string),
msg.sent
)
const isNew = addConvo(convo.topic, convo)

return isNew ? Array.from(topics.values()) : undefined
}

if (msg instanceof ConversationV2) {
const isNew = addConvo(msg.topic, msg)

return isNew ? Array.from(topics.values()) : undefined
}

return undefined
}

const str = await Stream.create<Message | Conversation | null>(
this.client,
topics,
async (env) => {
const msg = await this.client.decodeEnvelope(env)
return filter(msg) ? msg : undefined
},
Array.from(topics.values()),
decodeMessage,
contentTopicUpdater
)

return (async function* generate() {
for await (const val of str) {
if (val instanceof MessageV1 || val instanceof MessageV2) {
yield val
}
// For conversation V2, we may have messages in the new topic before we started streaming.
// To be safe, we fetch all messages
if (val instanceof ConversationV2) {
for (const convoMessage of await val.messages()) {
yield convoMessage
}
}
}
})()
}

private async getIntroductionPeers(): Promise<Map<string, Date>> {
Expand Down
64 changes: 34 additions & 30 deletions test/conversations/Conversations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,9 @@ describe('conversations', () => {
charlie.address
)
const bobAlice = await bob.conversations.newConversation(alice.address)
const charlieAlice = await charlie.conversations.newConversation(
alice.address
)

const stream = await alice.conversations.streamAllMessages()
await charlieAlice.send('gm alice -charlie')
await aliceCharlie.send('gm alice -charlie')

let numMessages = 0
for await (const message of stream) {
Expand All @@ -99,37 +96,44 @@ describe('conversations', () => {
expect(numMessages).toBe(3)
})

it('streams all conversation messages with existing conversations', async () => {
const aliceBob = await alice.conversations.newConversation(bob.address)
const bobAlice = await bob.conversations.newConversation(alice.address)

await aliceBob.send('gm alice -bob')
it('streams all conversation messages with a mix of v1 and v2 conversations', async () => {
const aliceBobV1 = await alice.conversations.newConversation(bob.address)
const aliceBobV2 = await alice.conversations.newConversation(bob.address, {
conversationId: 'xmtp.org/foo',
metadata: {},
})
await sleep(100)
const existingConversations = await alice.conversations.list()
expect(existingConversations).toHaveLength(1)

const stream = await alice.conversations.streamAllMessages()
await bobAlice.send('gm bob -alice')
await sleep(100)

let numMessages = 0
for await (const message of stream) {
numMessages++
if (numMessages == 1) {
expect(message.contentTopic).toBe(
buildDirectMessageTopic(alice.address, bob.address)
)
expect(message.content).toBe('gm bob -alice')
}
if (numMessages == 2) {
expect(message.contentTopic).toBe(
buildDirectMessageTopic(alice.address, bob.address)
)
expect(message.content).toBe('gm. hope you have a good day')
break
await aliceBobV1.send('V1')
const message1 = await stream.next()
expect(message1.value.content).toBe('V1')
expect(message1.value.contentTopic).toBe(buildUserIntroTopic(alice.address))

await aliceBobV2.send('V2')
const message2 = await stream.next()
expect(message2.value.content).toBe('V2')
expect(message2.value.contentTopic).toBe(aliceBobV2.topic)

await aliceBobV1.send('Second message in V1 channel')
const message3 = await stream.next()
expect(message3.value.content).toBe('Second message in V1 channel')
expect(message3.value.contentTopic).toBe(
buildDirectMessageTopic(alice.address, bob.address)
)

const aliceBobV2Bar = await alice.conversations.newConversation(
bob.address,
{
conversationId: 'xmtp.org/bar',
metadata: {},
}
await aliceBob.send('gm. hope you have a good day')
}
expect(numMessages).toBe(2)
)
await aliceBobV2Bar.send('bar')
const message4 = await stream.next()
expect(message4.value.content).toBe('bar')
})

it('dedupes conversations when multiple messages are in the introduction topic', async () => {
Expand Down

0 comments on commit a8fafe8

Please sign in to comment.