Skip to content

Commit

Permalink
Merge pull request #493 from xmtp/rygine/pppp-update3
Browse files Browse the repository at this point in the history
Use JobRunner to load consent list
  • Loading branch information
rygine authored Nov 15, 2023
2 parents 287bc7b + 92e042e commit bd3b725
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 54 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"dependencies": {
"@noble/secp256k1": "^1.5.2",
"@xmtp/ecies-bindings-wasm": "^0.1.7",
"@xmtp/proto": "^3.32.0",
"@xmtp/proto": "^3.34.0",
"async-mutex": "^0.4.0",
"elliptic": "^6.5.4",
"ethers": "^5.5.3",
Expand Down
2 changes: 1 addition & 1 deletion src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ export default class Client<ContentTypes = any> {
backupClient: BackupClient,
keystore: Keystore
) {
this.contacts = new Contacts(this)
this.knownPublicKeyBundles = new Map<
string,
PublicKeyBundle | SignedPublicKeyBundle
Expand All @@ -287,6 +286,7 @@ export default class Client<ContentTypes = any> {
this._maxContentSize = MaxContentSize
this.apiClient = apiClient
this._backupClient = backupClient
this.contacts = new Contacts(this)
}

/**
Expand Down
38 changes: 26 additions & 12 deletions src/Contacts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from './utils'
import Stream from './Stream'
import { OnConnectionLostCallback } from './ApiClient'
import JobRunner from './conversations/JobRunner'

export type ConsentState = 'allowed' | 'denied' | 'unknown'

Expand Down Expand Up @@ -103,18 +104,21 @@ export class ConsentList {
actions: privatePreferences.PrivatePreferencesAction[],
lastTimestampNs?: string
) {
const entries: ConsentListEntry[] = []
actions.forEach((action) => {
action.allow?.walletAddresses.forEach((address) => {
this.allow(address)
entries.push(this.allow(address))
})
action.block?.walletAddresses.forEach((address) => {
this.deny(address)
entries.push(this.deny(address))
})
})

if (lastTimestampNs) {
this.lastEntryTimestamp = fromNanoString(lastTimestampNs)
}

return entries
}

async stream(onConnectionLost?: OnConnectionLostCallback) {
Expand All @@ -140,12 +144,12 @@ export class ConsentList {
)
}

reset() {
// clear existing entries
this.entries.clear()
}

async load(startTime?: Date) {
// no startTime, all entries will be fetched
if (!startTime) {
// clear existing entries
this.entries.clear()
}
const identifier = await this.getIdentifier()
const contentTopic = buildUserPrivatePreferencesTopic(identifier)

Expand All @@ -167,7 +171,7 @@ export class ConsentList {
const actions = await this.decodeMessages(messages)

// update consent list
this.processActions(actions, lastTimestampNs)
return this.processActions(actions, lastTimestampNs)
}

async publish(entries: ConsentListEntry[]) {
Expand Down Expand Up @@ -239,19 +243,29 @@ export class Contacts {
*/
client: Client
private consentList: ConsentList
private jobRunner: JobRunner

constructor(client: Client) {
this.addresses = new Set<string>()
this.consentList = new ConsentList(client)
this.client = client
this.jobRunner = new JobRunner('pppp', client.keystore)
}

async loadConsentList(startTime?: Date) {
await this.consentList.load(startTime)
return this.jobRunner.run(async (lastRun) => {
// allow for override of startTime
return this.consentList.load(startTime ?? lastRun)
})
}

async refreshConsentList() {
await this.loadConsentList()
// clear existing consent list
this.consentList.reset()
// reset last run time to the epoch
await this.jobRunner.resetLastRunTime()
// reload the consent list
return this.loadConsentList()
}

async streamConsentList(onConnectionLost?: OnConnectionLostCallback) {
Expand All @@ -261,15 +275,15 @@ export class Contacts {
/**
* The timestamp of the last entry in the consent list
*/
get lastSyncedAt() {
get lastConsentListEntryTimestamp() {
return this.consentList.lastEntryTimestamp
}

setConsentListEntries(entries: ConsentListEntry[]) {
if (!entries.length) {
return
}
this.consentList.entries.clear()
this.consentList.reset()
entries.forEach((entry) => {
if (entry.permissionType === 'allowed') {
this.consentList.allow(entry.value)
Expand Down
10 changes: 2 additions & 8 deletions src/conversations/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import { SortDirection } from '../ApiClient'
import Long from 'long'
import JobRunner from './JobRunner'

const CLOCK_SKEW_OFFSET_MS = 10000

const messageHasHeaders = (msg: MessageV1): boolean => {
return Boolean(msg.recipientAddress && msg.senderAddress)
}
Expand Down Expand Up @@ -74,9 +72,7 @@ export default class Conversations<ContentTypes = any> {
private async listV1Conversations(): Promise<Conversation<ContentTypes>[]> {
return this.v1JobRunner.run(async (latestSeen) => {
const seenPeers = await this.getIntroductionPeers({
startTime: latestSeen
? new Date(+latestSeen - CLOCK_SKEW_OFFSET_MS)
: undefined,
startTime: latestSeen,
direction: SortDirection.SORT_DIRECTION_ASCENDING,
})

Expand Down Expand Up @@ -144,9 +140,7 @@ export default class Conversations<ContentTypes = any> {
startTime?: Date
): Promise<ConversationV2<ContentTypes>[]> {
const envelopes = await this.client.listInvitations({
startTime: startTime
? new Date(+startTime - CLOCK_SKEW_OFFSET_MS)
: undefined,
startTime,
direction: SortDirection.SORT_DIRECTION_ASCENDING,
})

Expand Down
23 changes: 20 additions & 3 deletions src/conversations/JobRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import { Keystore } from '../keystore'
import Long from 'long'
import { dateToNs, nsToDate } from '../utils'

type JobType = 'v1' | 'v2'
const CLOCK_SKEW_OFFSET_MS = 10000

type JobType = 'v1' | 'v2' | 'pppp'

type UpdateJob<T> = (lastRun: Date | undefined) => Promise<T>

export default class JobRunner {
readonly jobType: JobType
readonly mutex: Mutex
readonly keystore: Keystore
disableOffset: boolean = false

constructor(jobType: JobType, keystore: Keystore) {
this.jobType = jobType
Expand All @@ -27,12 +30,25 @@ export default class JobRunner {
return this.mutex.runExclusive(async () => {
const lastRun = await this.getLastRunTime()
const startTime = new Date()
const result = await callback(lastRun)
const result = await callback(
lastRun
? !this.disableOffset
? new Date(lastRun.getTime() - CLOCK_SKEW_OFFSET_MS)
: lastRun
: undefined
)
await this.setLastRunTime(startTime)
return result
})
}

async resetLastRunTime() {
await this.keystore.setRefreshJob({
jobType: this.protoJobType,
lastRunNs: dateToNs(new Date(0)),
})
}

private async getLastRunTime(): Promise<Date | undefined> {
const { lastRunNs } = await this.keystore.getRefreshJob(
keystore.GetRefreshJobRequest.fromPartial({
Expand All @@ -53,10 +69,11 @@ export default class JobRunner {
}
}

function getProtoJobType(jobType: 'v1' | 'v2'): keystore.JobType {
function getProtoJobType(jobType: JobType): keystore.JobType {
const protoJobType = {
v1: keystore.JobType.JOB_TYPE_REFRESH_V1,
v2: keystore.JobType.JOB_TYPE_REFRESH_V2,
pppp: keystore.JobType.JOB_TYPE_REFRESH_PPPP,
}[jobType]

if (!protoJobType) {
Expand Down
88 changes: 63 additions & 25 deletions test/Contacts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,37 +104,75 @@ describe('Contacts', () => {
})

it('should retrieve consent state', async () => {
await aliceClient.contacts.deny([bob.address])
await aliceClient.contacts.allow([carol.address])
await aliceClient.contacts.allow([bob.address])
await aliceClient.contacts.deny([carol.address])
await aliceClient.contacts.deny([bob.address])
await aliceClient.contacts.allow([carol.address])
const entries = await bobClient.contacts.refreshConsentList()

expect(aliceClient.contacts.consentState(bob.address)).toBe('denied')
expect(aliceClient.contacts.isAllowed(bob.address)).toBe(false)
expect(aliceClient.contacts.isDenied(bob.address)).toBe(true)
expect(entries.length).toBe(0)

expect(aliceClient.contacts.consentState(carol.address)).toBe('allowed')
expect(aliceClient.contacts.isAllowed(carol.address)).toBe(true)
expect(aliceClient.contacts.isDenied(carol.address)).toBe(false)

aliceClient = await Client.create(alice, {
env: 'local',
})
await bobClient.contacts.deny([alice.address])
await bobClient.contacts.allow([carol.address])
await bobClient.contacts.allow([alice.address])
await bobClient.contacts.deny([carol.address])
await bobClient.contacts.deny([alice.address])
await bobClient.contacts.allow([carol.address])

expect(aliceClient.contacts.consentState(bob.address)).toBe('unknown')
expect(aliceClient.contacts.consentState(carol.address)).toBe('unknown')
expect(bobClient.contacts.consentState(alice.address)).toBe('denied')
expect(bobClient.contacts.isAllowed(alice.address)).toBe(false)
expect(bobClient.contacts.isDenied(alice.address)).toBe(true)

await aliceClient.contacts.refreshConsentList()
expect(bobClient.contacts.consentState(carol.address)).toBe('allowed')
expect(bobClient.contacts.isAllowed(carol.address)).toBe(true)
expect(bobClient.contacts.isDenied(carol.address)).toBe(false)

expect(aliceClient.contacts.consentState(bob.address)).toBe('denied')
expect(aliceClient.contacts.isAllowed(bob.address)).toBe(false)
expect(aliceClient.contacts.isDenied(bob.address)).toBe(true)
bobClient = await Client.create(bob, {
env: 'local',
})

expect(aliceClient.contacts.consentState(carol.address)).toBe('allowed')
expect(aliceClient.contacts.isAllowed(carol.address)).toBe(true)
expect(aliceClient.contacts.isDenied(carol.address)).toBe(false)
expect(bobClient.contacts.consentState(alice.address)).toBe('unknown')
expect(bobClient.contacts.consentState(carol.address)).toBe('unknown')

const latestEntries = await bobClient.contacts.refreshConsentList()

expect(latestEntries.length).toBe(6)
expect(latestEntries).toEqual([
{
entryType: 'address',
permissionType: 'denied',
value: alice.address,
},
{
entryType: 'address',
permissionType: 'allowed',
value: carol.address,
},
{
entryType: 'address',
permissionType: 'allowed',
value: alice.address,
},
{
entryType: 'address',
permissionType: 'denied',
value: carol.address,
},
{
entryType: 'address',
permissionType: 'denied',
value: alice.address,
},
{
entryType: 'address',
permissionType: 'allowed',
value: carol.address,
},
])

expect(bobClient.contacts.consentState(alice.address)).toBe('denied')
expect(bobClient.contacts.isAllowed(alice.address)).toBe(false)
expect(bobClient.contacts.isDenied(alice.address)).toBe(true)

expect(bobClient.contacts.consentState(carol.address)).toBe('allowed')
expect(bobClient.contacts.isAllowed(carol.address)).toBe(true)
expect(bobClient.contacts.isDenied(carol.address)).toBe(false)
})

it('should stream consent updates', async () => {
Expand Down
19 changes: 19 additions & 0 deletions test/conversations/JobRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,23 @@ describe('JobRunner', () => {
})
).rejects.toThrow('foo')
})

it('resets the last run time', async () => {
const ppppRunner = new JobRunner('pppp', keystore)
await ppppRunner.run(async () => {})

const { lastRunNs: ppppLastRunNs } = await keystore.getRefreshJob({
jobType: keystoreProto.JobType.JOB_TYPE_REFRESH_PPPP,
})

expect(ppppLastRunNs.gt(0)).toBeTruthy()

await ppppRunner.resetLastRunTime()

const { lastRunNs: ppppLastRunNs2 } = await keystore.getRefreshJob({
jobType: keystoreProto.JobType.JOB_TYPE_REFRESH_PPPP,
})

expect(ppppLastRunNs2.eq(0)).toBeTruthy()
})
})

0 comments on commit bd3b725

Please sign in to comment.