diff --git a/src/Contacts.ts b/src/Contacts.ts index 8c89980e5..4a5e42c49 100644 --- a/src/Contacts.ts +++ b/src/Contacts.ts @@ -5,6 +5,8 @@ import { buildUserPrivatePreferencesTopic, fromNanoString, } from './utils' +import Stream from './Stream' +import { OnConnectionLostCallback } from './ApiClient' export type ConsentState = 'allowed' | 'denied' | 'unknown' @@ -77,30 +79,7 @@ export class ConsentList { return this._identifier } - async load(startTime?: Date) { - // no startTime, all entries will be fetched - if (!startTime) { - // clear existing entries - this.entries.clear() - } - const identifier = await this.getIdentifier() - const contentTopic = buildUserPrivatePreferencesTopic(identifier) - - let lastTimestampNs: string | undefined - - const messages = await this.client.listEnvelopes( - contentTopic, - async ({ message, timestampNs }: EnvelopeWithMessage) => { - if (timestampNs) { - lastTimestampNs = timestampNs - } - return message - }, - { - startTime, - } - ) - + async decodeMessages(messages: Uint8Array[]) { // decrypt messages const { responses } = await this.client.keystore.selfDecrypt({ requests: messages.map((message) => ({ payload: message })), @@ -117,7 +96,13 @@ export class ConsentList { : result }, [] as PrivatePreferencesAction[]) - // update consent list entries + return actions + } + + processActions( + actions: privatePreferences.PrivatePreferencesAction[], + lastTimestampNs?: string + ) { actions.forEach((action) => { action.allow?.walletAddresses.forEach((address) => { this.allow(address) @@ -132,6 +117,59 @@ export class ConsentList { } } + async stream(onConnectionLost?: OnConnectionLostCallback) { + const identifier = await this.getIdentifier() + const contentTopic = buildUserPrivatePreferencesTopic(identifier) + + return Stream.create( + this.client, + [contentTopic], + async (envelope) => { + if (!envelope.message) { + return undefined + } + const actions = await this.decodeMessages([envelope.message]) + + // update consent list + this.processActions(actions, envelope.timestampNs) + + return actions[0] + }, + undefined, + onConnectionLost + ) + } + + async load(startTime?: Date) { + // no startTime, all entries will be fetched + if (!startTime) { + // clear existing entries + this.entries.clear() + } + const identifier = await this.getIdentifier() + const contentTopic = buildUserPrivatePreferencesTopic(identifier) + + let lastTimestampNs: string | undefined + + const messages = await this.client.listEnvelopes( + contentTopic, + async ({ message, timestampNs }: EnvelopeWithMessage) => { + if (timestampNs) { + lastTimestampNs = timestampNs + } + return message + }, + { + startTime, + } + ) + + const actions = await this.decodeMessages(messages) + + // update consent list + this.processActions(actions, lastTimestampNs) + } + async publish(entries: ConsentListEntry[]) { const identifier = await this.getIdentifier() @@ -216,6 +254,10 @@ export class Contacts { await this.loadConsentList() } + async streamConsentList(onConnectionLost?: OnConnectionLostCallback) { + return this.consentList.stream(onConnectionLost) + } + /** * The timestamp of the last entry in the consent list */ diff --git a/test/Contacts.test.ts b/test/Contacts.test.ts index 8024c5b08..2f28369f1 100644 --- a/test/Contacts.test.ts +++ b/test/Contacts.test.ts @@ -1,3 +1,4 @@ +import { privatePreferences } from '@xmtp/proto' import Client from '../src/Client' import { Contacts } from '../src/Contacts' import { newWallet } from './helpers' @@ -135,4 +136,20 @@ describe('Contacts', () => { expect(aliceClient.contacts.isAllowed(carol.address)).toBe(true) expect(aliceClient.contacts.isDenied(carol.address)).toBe(false) }) + + it('should stream consent updates', async () => { + const aliceStream = await aliceClient.contacts.streamConsentList() + await aliceClient.conversations.newConversation(bob.address) + + let numActions = 0 + const actions: privatePreferences.PrivatePreferencesAction[] = [] + for await (const action of aliceStream) { + numActions++ + expect(action.block).toBeUndefined() + expect(action.allow?.walletAddresses).toEqual([bob.address]) + break + } + expect(numActions).toBe(1) + await aliceStream.return() + }) })