Skip to content

Commit

Permalink
Persistence Store - be more perscriptive about what we store in local…
Browse files Browse the repository at this point in the history
… storage (#1316)

when scrolling backwards, we don’t care about any “state” events that
have already been snapshotted, so don’t write them. we really only care
about channel messages so that we don’t have to refetch them
  • Loading branch information
texuf authored Oct 22, 2024
1 parent 339d2fc commit c8fb2e6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 10 deletions.
6 changes: 5 additions & 1 deletion packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,11 @@ export class Client
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
await this.persistenceStore.saveMiniblocks(streamIdAsString(streamId), unpackedMiniblocks)
await this.persistenceStore.saveMiniblocks(
streamIdAsString(streamId),
unpackedMiniblocks,
'backward',
)
return {
terminus: response.terminus,
miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks],
Expand Down
22 changes: 17 additions & 5 deletions packages/sdk/src/persistenceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ export interface IPersistenceStore {
>
saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream): Promise<void>
saveMiniblock(streamId: string, miniblock: ParsedMiniblock): Promise<void>
saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]): Promise<void>
saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
): Promise<void>
getMiniblock(streamId: string, miniblockNum: bigint): Promise<ParsedMiniblock | undefined>
getMiniblocks(
streamId: string,
Expand Down Expand Up @@ -147,21 +151,25 @@ export class PersistenceStore extends Dexie implements IPersistenceStore {

async saveMiniblock(streamId: string, miniblock: ParsedMiniblock) {
log('saving miniblock', streamId)
const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock)
const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock, 'forward')
await this.miniblocks.put({
streamId: streamId,
miniblockNum: miniblock.header.miniblockNum.toString(),
data: cachedMiniblock.toBinary(),
})
}

async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) {
async saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
) {
await this.miniblocks.bulkPut(
miniblocks.map((mb) => {
return {
streamId: streamId,
miniblockNum: mb.header.miniblockNum.toString(),
data: parsedMiniblockToPersistedMiniblock(mb).toBinary(),
data: parsedMiniblockToPersistedMiniblock(mb, direction).toBinary(),
}
}),
)
Expand Down Expand Up @@ -262,7 +270,11 @@ export class StubPersistenceStore implements IPersistenceStore {
return Promise.resolve()
}

async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) {
async saveMiniblocks(
streamId: string,
miniblocks: ParsedMiniblock[],
direction: 'forward' | 'backward',
) {
return Promise.resolve()
}

Expand Down
57 changes: 54 additions & 3 deletions packages/sdk/src/streamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,53 @@ import {
} from '@river-build/proto'
import { ParsedEvent, ParsedMiniblock } from './types'
import { bin_toHexString } from '@river-build/dlog'
import { isDefined } from './check'
import { isDefined, logNever } from './check'

export function isPersistedEvent(event: ParsedEvent, direction: 'forward' | 'backward'): boolean {
if (!event.event) {
return false
}

switch (event.event.payload.case) {
case 'channelPayload':
return true
case 'dmChannelPayload':
return true
case 'gdmChannelPayload':
return true
case 'mediaPayload':
return true
case 'userPayload':
return direction === 'forward' ? true : false
case 'userSettingsPayload':
return direction === 'forward' ? true : false
case 'miniblockHeader':
return true
case 'userMetadataPayload':
return direction === 'forward' ? true : false
case 'memberPayload': {
switch (event.event.payload.value.content.case) {
case 'keySolicitation':
return direction === 'forward' ? true : false
case 'keyFulfillment':
return direction === 'forward' ? true : false
case undefined:
return false
default:
return direction === 'forward' ? true : false
}
}
case 'spacePayload':
return direction === 'forward' ? true : false
case 'userInboxPayload':
return direction === 'forward' ? true : false
case undefined:
return false
default:
logNever(event.event.payload, `unsupported event payload ${event.event.payload}`)
return false
}
}

export function persistedEventToParsedEvent(event: PersistedEvent): ParsedEvent | undefined {
if (!event.event) {
Expand Down Expand Up @@ -36,11 +82,16 @@ export function persistedMiniblockToParsedMiniblock(
}
}

export function parsedMiniblockToPersistedMiniblock(miniblock: ParsedMiniblock) {
export function parsedMiniblockToPersistedMiniblock(
miniblock: ParsedMiniblock,
direction: 'forward' | 'backward',
) {
return new PersistedMiniblock({
hash: miniblock.hash,
header: miniblock.header,
events: miniblock.events.map(parsedEventToPersistedEvent),
events: miniblock.events
.filter((event) => isPersistedEvent(event, direction))
.map(parsedEventToPersistedEvent),
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/syncedStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class SyncedStream extends Stream implements ISyncedStream {
lastMiniblockNum: miniblocks[miniblocks.length - 1].header.miniblockNum,
})
await this.persistenceStore.saveSyncedStream(this.streamId, cachedSyncedStream)
await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks)
await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks, 'forward')
this.markUpToDate()
}

Expand Down

0 comments on commit c8fb2e6

Please sign in to comment.