Skip to content

Commit

Permalink
Merge pull request #142 from xmtp/ea/all-messages
Browse files Browse the repository at this point in the history
Introduce API to stream all messages for all conversations
  • Loading branch information
Elise Alix authored Aug 4, 2022
2 parents aca65df + 0bf89e9 commit a0a0c31
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 30 deletions.
29 changes: 15 additions & 14 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
promiseWithTimeout,
} from './utils'
import { sleep } from '../test/helpers'
import Stream, { MessageFilter } from './Stream'
import Stream, { MessageFilter, noTransformation } from './Stream'
import { Signer } from 'ethers'
import {
EncryptedStore,
Expand Down Expand Up @@ -385,7 +385,10 @@ export default class Client {
return Message.encode(this.keys, recipient, payload, timestamp)
}

async decodeMessage(payload: Uint8Array): Promise<Message> {
async decodeMessage(
payload: Uint8Array,
contentTopic: string | undefined
): Promise<Message> {
const message = await Message.decode(this.keys, payload)
if (message.error) {
return message
Expand All @@ -400,6 +403,7 @@ export default class Client {
}
const contentType = new ContentTypeId(encoded.type)
const codec = this.codecFor(contentType)
message.contentTopic = contentTopic
if (codec) {
message.content = codec.decode(encoded as EncodedContent, this)
message.contentType = contentType
Expand All @@ -416,18 +420,18 @@ export default class Client {
streamIntroductionMessages(): Promise<Stream<Message>> {
return Stream.create<Message>(
this,
buildUserIntroTopic(this.address),
[buildUserIntroTopic(this.address)],
noTransformation
)
}

streamConversationMessages(peerAddress: string): Promise<Stream<Message>> {
const topic = buildDirectMessageTopic(peerAddress, this.address)
const topics = [buildDirectMessageTopic(peerAddress, this.address)]
return Stream.create<Message>(
this,
topic,
topics,
noTransformation,
filterForTopic(topic)
filterForTopics(topics)
)
}

Expand Down Expand Up @@ -476,11 +480,11 @@ export default class Client {
wakuMsgs = wakuMsgs.filter((wakuMsg) => wakuMsg?.payload)
let msgs = await Promise.all(
wakuMsgs.map((wakuMsg) =>
this.decodeMessage(wakuMsg.payload as Uint8Array)
this.decodeMessage(wakuMsg.payload as Uint8Array, wakuMsg.contentTopic)
)
)
if (opts?.checkAddresses) {
msgs = msgs.filter(filterForTopic(topic))
msgs = msgs.filter(filterForTopics([topic]))
}
return msgs
}
Expand Down Expand Up @@ -627,18 +631,15 @@ async function getNodeList(env: keyof NodesList): Promise<string[]> {
return Object.values(nodesList[env])
}

function noTransformation(msg: Message) {
return msg
}

function filterForTopic(topic: string): MessageFilter {
// Ensure the message didn't have a spoofed address
function filterForTopics(topics: string[]): MessageFilter {
return (msg) => {
const senderAddress = msg.senderAddress
const recipientAddress = msg.recipientAddress
return (
senderAddress !== undefined &&
recipientAddress !== undefined &&
buildDirectMessageTopic(senderAddress, recipientAddress) === topic
topics.includes(buildDirectMessageTopic(senderAddress, recipientAddress))
)
}
}
1 change: 1 addition & 0 deletions src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export default class Message implements proto.V1Message {
// the message receiving APIs need to return a Message to provide access to the header fields like sender/recipient
contentType?: ContentTypeId
content?: any // eslint-disable-line @typescript-eslint/no-explicit-any
contentTopic?: string // content topic that triggered the message
error?: Error
/**
* Identifier that is deterministically derived from the bytes of the message
Expand Down
66 changes: 54 additions & 12 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ export type MessageTransformer<T> = (msg: Message) => T

export type MessageFilter = (msg: Message) => boolean

export type ContentTopicUpdater = (msg: Message) => string[] | undefined

export const noTransformation = (msg: Message): Message => {
return msg
}

/**
* Stream implements an Asynchronous Iterable over messages received from a topic.
* As such can be used with constructs like for-await-of, yield*, array destructing, etc.
*/
export default class Stream<T> {
topic: string
topics: string[]
client: Client
// queue of incoming Waku messages
messages: T[]
Expand All @@ -31,31 +37,47 @@ export default class Stream<T> {

constructor(
client: Client,
topic: string,
topics: string[],
messageTransformer: MessageTransformer<T>,
messageFilter?: MessageFilter
messageFilter?: MessageFilter,
contentTopicUpdater?: ContentTopicUpdater
) {
this.messages = []
this.resolvers = []
this.topic = topic
this.topics = topics
this.client = client
this.callback = this.newMessageCallback(messageTransformer, messageFilter)
this.callback = this.newMessageCallback(
messageTransformer,
messageFilter,
contentTopicUpdater
)
}

// returns new closure to handle incoming Waku messages
private newMessageCallback(
transformer: MessageTransformer<T>,
filter?: MessageFilter
filter?: MessageFilter,
contentTopicUpdater?: ContentTopicUpdater
): (wakuMsg: WakuMessage) => Promise<void> {
return async (wakuMsg: WakuMessage) => {
if (!wakuMsg.payload) {
return
}
const msg = await this.client.decodeMessage(wakuMsg.payload)
const msg = await this.client.decodeMessage(
wakuMsg.payload,
wakuMsg.contentTopic
)
// If there is a filter on the stream, and the filter returns false, ignore the message
if (filter && !filter(msg)) {
return
}
// Check to see if we should update the stream's content topic subscription
if (contentTopicUpdater) {
const topics = contentTopicUpdater(msg)
if (topics) {
this.resubscribeToTopics(topics)
}
}
// is there a Promise already pending?
const resolver = this.resolvers.pop()
if (resolver) {
Expand All @@ -75,7 +97,7 @@ export default class Stream<T> {

this.unsubscribeFn = await this.client.waku.filter.subscribe(
this.callback,
[this.topic]
this.topics
)
await this.listenForDisconnect()
}
Expand All @@ -93,7 +115,7 @@ export default class Stream<T> {
}
this.unsubscribeFn = await this.client.waku.filter.subscribe(
this.callback,
[this.topic]
this.topics
)
console.log(`Connection to peer ${connection.remoteAddr} restored`)
return
Expand All @@ -113,11 +135,18 @@ export default class Stream<T> {

static async create<T>(
client: Client,
topic: string,
topics: string[],
messageTransformer: MessageTransformer<T>,
messageFilter?: MessageFilter
messageFilter?: MessageFilter,
contentTopicUpdater?: ContentTopicUpdater
): Promise<Stream<T>> {
const stream = new Stream(client, topic, messageTransformer, messageFilter)
const stream = new Stream(
client,
topics,
messageTransformer,
messageFilter,
contentTopicUpdater
)
await stream.start()
return stream
}
Expand Down Expand Up @@ -167,4 +196,17 @@ export default class Stream<T> {
// otherwise return empty Promise and queue its resolver
return new Promise((resolve) => this.resolvers.unshift(resolve))
}

// Unsubscribe from the existing content topics and resubscribe to the given topics.
private async resubscribeToTopics(topics: string[]): Promise<void> {
if (!this.callback || !this.unsubscribeFn) {
throw new Error('Missing callback for stream')
}
await this.unsubscribeFn()
this.topics = topics
this.unsubscribeFn = await this.client.waku.filter.subscribe(
this.callback,
this.topics
)
}
}
74 changes: 71 additions & 3 deletions src/conversations/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import Conversation from './Conversation'
import Message from '../Message'
import Stream, { MessageFilter, MessageTransformer } from '../Stream'
import Stream, {
MessageFilter,
MessageTransformer,
noTransformation,
} from '../Stream'
import Client from '../Client'
import { buildUserIntroTopic } from '../utils'
import { buildDirectMessageTopic, buildUserIntroTopic } from '../utils'

const messageHasHeaders: MessageFilter = (msg: Message) => {
return Boolean(msg.recipientAddress && msg.senderAddress)
Expand Down Expand Up @@ -74,12 +78,76 @@ export default class Conversations {

return Stream.create<Conversation>(
this.client,
buildUserIntroTopic(this.client.address),
[buildUserIntroTopic(this.client.address)],
messageTransformer,
filter
)
}

/**
* Returns a stream for all new messages from existing and new conversations.
*/
async streamAllMessages(): Promise<Stream<Message>> {
const conversations = await this.list()
const dmAddresses: string[] = conversations.map(
(conversation) => conversation.peerAddress
)
const introTopic = buildUserIntroTopic(this.client.address)
const topics = this.buildTopicsForAllMessages(dmAddresses, introTopic)

// If we detect a new intro topic, update the stream's direct message topics to include the new topic
const contentTopicUpdater = (msg: Message): string[] | undefined => {
if (msg.contentTopic !== introTopic || !messageHasHeaders(msg)) {
return undefined
}
const peerAddress = this.getPeerAddress(msg)
if (
dmAddresses.includes(peerAddress) ||
peerAddress === this.client.address
) {
// No need to update if we're already subscribed
return undefined
}
dmAddresses.push(peerAddress)
return this.buildTopicsForAllMessages(dmAddresses, introTopic)
}

// Filter intro topics if already streaming direct messages for that address to avoid duplicates
const filter = (msg: Message): boolean => {
if (
msg.contentTopic === introTopic &&
messageHasHeaders(msg) &&
dmAddresses.includes(this.getPeerAddress(msg))
) {
return false
}
return true
}

return Stream.create<Message>(
this.client,
topics,
noTransformation,
filter,
contentTopicUpdater
)
}

/**
* Builds a list of topics for existing conversations and new intro topics
*/
private buildTopicsForAllMessages(
peerAddresses: string[],
introTopic: string
): string[] {
const topics = peerAddresses.map((address) =>
buildDirectMessageTopic(address, this.client.address)
)
// Ensure we listen for new conversation topics as well
topics.push(introTopic)
return topics
}

/**
* Creates a new conversation for the given address. Will throw an error if the peer is not found in the XMTP network
*/
Expand Down
Loading

0 comments on commit a0a0c31

Please sign in to comment.