Skip to content

Commit

Permalink
Merge branch 'beta' into rygine/export-pppp-type
Browse files Browse the repository at this point in the history
  • Loading branch information
rygine authored Nov 13, 2023
2 parents 1f9ce8e + 6bceb1c commit 48558bc
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 25 deletions.
92 changes: 67 additions & 25 deletions src/Contacts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
buildUserPrivatePreferencesTopic,
fromNanoString,
} from './utils'
import Stream from './Stream'
import { OnConnectionLostCallback } from './ApiClient'

export type ConsentState = 'allowed' | 'denied' | 'unknown'

Expand Down Expand Up @@ -77,30 +79,7 @@ export class ConsentList {
return this._identifier
}

async load(startTime?: Date) {
// no startTime, all entries will be fetched
if (!startTime) {
// clear existing entries
this.entries.clear()
}
const identifier = await this.getIdentifier()
const contentTopic = buildUserPrivatePreferencesTopic(identifier)

let lastTimestampNs: string | undefined

const messages = await this.client.listEnvelopes(
contentTopic,
async ({ message, timestampNs }: EnvelopeWithMessage) => {
if (timestampNs) {
lastTimestampNs = timestampNs
}
return message
},
{
startTime,
}
)

async decodeMessages(messages: Uint8Array[]) {
// decrypt messages
const { responses } = await this.client.keystore.selfDecrypt({
requests: messages.map((message) => ({ payload: message })),
Expand All @@ -117,7 +96,13 @@ export class ConsentList {
: result
}, [] as PrivatePreferencesAction[])

// update consent list entries
return actions
}

processActions(
actions: privatePreferences.PrivatePreferencesAction[],
lastTimestampNs?: string
) {
actions.forEach((action) => {
action.allow?.walletAddresses.forEach((address) => {
this.allow(address)
Expand All @@ -132,6 +117,59 @@ export class ConsentList {
}
}

async stream(onConnectionLost?: OnConnectionLostCallback) {
const identifier = await this.getIdentifier()
const contentTopic = buildUserPrivatePreferencesTopic(identifier)

return Stream.create<privatePreferences.PrivatePreferencesAction>(
this.client,
[contentTopic],
async (envelope) => {
if (!envelope.message) {
return undefined
}
const actions = await this.decodeMessages([envelope.message])

// update consent list
this.processActions(actions, envelope.timestampNs)

return actions[0]
},
undefined,
onConnectionLost
)
}

async load(startTime?: Date) {
// no startTime, all entries will be fetched
if (!startTime) {
// clear existing entries
this.entries.clear()
}
const identifier = await this.getIdentifier()
const contentTopic = buildUserPrivatePreferencesTopic(identifier)

let lastTimestampNs: string | undefined

const messages = await this.client.listEnvelopes(
contentTopic,
async ({ message, timestampNs }: EnvelopeWithMessage) => {
if (timestampNs) {
lastTimestampNs = timestampNs
}
return message
},
{
startTime,
}
)

const actions = await this.decodeMessages(messages)

// update consent list
this.processActions(actions, lastTimestampNs)
}

async publish(entries: ConsentListEntry[]) {
const identifier = await this.getIdentifier()

Expand Down Expand Up @@ -216,6 +254,10 @@ export class Contacts {
await this.loadConsentList()
}

async streamConsentList(onConnectionLost?: OnConnectionLostCallback) {
return this.consentList.stream(onConnectionLost)
}

/**
* The timestamp of the last entry in the consent list
*/
Expand Down
17 changes: 17 additions & 0 deletions test/Contacts.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { privatePreferences } from '@xmtp/proto'
import Client from '../src/Client'
import { Contacts } from '../src/Contacts'
import { newWallet } from './helpers'
Expand Down Expand Up @@ -135,4 +136,20 @@ describe('Contacts', () => {
expect(aliceClient.contacts.isAllowed(carol.address)).toBe(true)
expect(aliceClient.contacts.isDenied(carol.address)).toBe(false)
})

it('should stream consent updates', async () => {
const aliceStream = await aliceClient.contacts.streamConsentList()
await aliceClient.conversations.newConversation(bob.address)

let numActions = 0
const actions: privatePreferences.PrivatePreferencesAction[] = []
for await (const action of aliceStream) {
numActions++
expect(action.block).toBeUndefined()
expect(action.allow?.walletAddresses).toEqual([bob.address])
break
}
expect(numActions).toBe(1)
await aliceStream.return()
})
})

0 comments on commit 48558bc

Please sign in to comment.