Skip to content

Commit

Permalink
Persistence Store - be more perscriptive about what we store in local…
Browse files Browse the repository at this point in the history
… storage

when scrolling backwards, we don’t care about any “state” events that have already been snapshotted, so don’t write them. we really only care about channel messages so that we don’t have to refetch them
  • Loading branch information
texuf committed Oct 21, 2024
1 parent 59199e4 commit b32828b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 10 deletions.
6 changes: 5 additions & 1 deletion packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,11 @@ export class Client
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
await this.persistenceStore.saveMiniblocks(streamIdAsString(streamId), unpackedMiniblocks)
await this.persistenceStore.saveMiniblocks(
streamIdAsString(streamId),
unpackedMiniblocks,
'backward',
)
return {
terminus: response.terminus,
miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks],
Expand Down
22 changes: 17 additions & 5 deletions packages/sdk/src/persistenceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ export interface IPersistenceStore {
>
saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream): Promise<void>
saveMiniblock(streamId: string, miniblock: ParsedMiniblock): Promise<void>
saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]): Promise<void>
saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
): Promise<void>
getMiniblock(streamId: string, miniblockNum: bigint): Promise<ParsedMiniblock | undefined>
getMiniblocks(
streamId: string,
Expand Down Expand Up @@ -147,21 +151,25 @@ export class PersistenceStore extends Dexie implements IPersistenceStore {

async saveMiniblock(streamId: string, miniblock: ParsedMiniblock) {
log('saving miniblock', streamId)
const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock)
const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock, 'forward')
await this.miniblocks.put({
streamId: streamId,
miniblockNum: miniblock.header.miniblockNum.toString(),
data: cachedMiniblock.toBinary(),
})
}

async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) {
async saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
) {
await this.miniblocks.bulkPut(
miniblocks.map((mb) => {
return {
streamId: streamId,
miniblockNum: mb.header.miniblockNum.toString(),
data: parsedMiniblockToPersistedMiniblock(mb).toBinary(),
data: parsedMiniblockToPersistedMiniblock(mb, direction).toBinary(),
}
}),
)
Expand Down Expand Up @@ -262,7 +270,11 @@ export class StubPersistenceStore implements IPersistenceStore {
return Promise.resolve()
}

async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) {
async saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
) {
return Promise.resolve()
}

Expand Down
57 changes: 54 additions & 3 deletions packages/sdk/src/streamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,53 @@ import {
} from '@river-build/proto'
import { ParsedEvent, ParsedMiniblock } from './types'
import { bin_toHexString } from '@river-build/dlog'
import { isDefined } from './check'
import { isDefined, logNever } from './check'

export function isPersistedEvent(event: ParsedEvent, direction: 'forward' | 'backward'): boolean {
if (!event.event) {
return false
}

switch (event.event.payload.case) {
case 'channelPayload':
return true
case 'dmChannelPayload':
return true
case 'gdmChannelPayload':
return true
case 'mediaPayload':
return true
case 'userPayload':
return direction === 'forward' ? true : false
case 'userSettingsPayload':
return direction === 'forward' ? true : false
case 'miniblockHeader':
return true
case 'userMetadataPayload':
return direction === 'forward' ? true : false
case 'memberPayload': {
switch (event.event.payload.value.content.case) {
case 'keySolicitation':
return direction === 'forward' ? true : false
case 'keyFulfillment':
return direction === 'forward' ? true : false
case undefined:
return false
default:
return direction === 'forward' ? true : false
}
}
case 'spacePayload':
return direction === 'forward' ? true : false
case 'userInboxPayload':
return direction === 'forward' ? true : false
case undefined:
return false
default:
logNever(event.event.payload, `unsupported event payload ${event.event.payload}`)
return false
}
}

export function persistedEventToParsedEvent(event: PersistedEvent): ParsedEvent | undefined {
if (!event.event) {
Expand Down Expand Up @@ -36,11 +82,16 @@ export function persistedMiniblockToParsedMiniblock(
}
}

export function parsedMiniblockToPersistedMiniblock(miniblock: ParsedMiniblock) {
export function parsedMiniblockToPersistedMiniblock(
miniblock: ParsedMiniblock,
direction: 'forward' | 'backward',
) {
return new PersistedMiniblock({
hash: miniblock.hash,
header: miniblock.header,
events: miniblock.events.map(parsedEventToPersistedEvent),
events: miniblock.events
.filter((event) => isPersistedEvent(event, direction))
.map(parsedEventToPersistedEvent),
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/syncedStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class SyncedStream extends Stream implements ISyncedStream {
lastMiniblockNum: miniblocks[miniblocks.length - 1].header.miniblockNum,
})
await this.persistenceStore.saveSyncedStream(this.streamId, cachedSyncedStream)
await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks)
await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks, 'forward')
this.markUpToDate()
}

Expand Down

0 comments on commit b32828b

Please sign in to comment.