diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index a96188b07..b2e8d8939 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -1881,7 +1881,11 @@ export class Client const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts) unpackedMiniblocks.push(unpackedMiniblock) } - await this.persistenceStore.saveMiniblocks(streamIdAsString(streamId), unpackedMiniblocks) + await this.persistenceStore.saveMiniblocks( + streamIdAsString(streamId), + unpackedMiniblocks, + 'backward', + ) return { terminus: response.terminus, miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks], diff --git a/packages/sdk/src/persistenceStore.ts b/packages/sdk/src/persistenceStore.ts index fa536c745..868f0e28b 100644 --- a/packages/sdk/src/persistenceStore.ts +++ b/packages/sdk/src/persistenceStore.ts @@ -78,7 +78,11 @@ export interface IPersistenceStore { > saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream): Promise saveMiniblock(streamId: string, miniblock: ParsedMiniblock): Promise - saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]): Promise + saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ): Promise getMiniblock(streamId: string, miniblockNum: bigint): Promise getMiniblocks( streamId: string, @@ -147,7 +151,7 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { async saveMiniblock(streamId: string, miniblock: ParsedMiniblock) { log('saving miniblock', streamId) - const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock) + const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock, 'forward') await this.miniblocks.put({ streamId: streamId, miniblockNum: miniblock.header.miniblockNum.toString(), @@ -155,13 +159,17 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { }) } - async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) { + async saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ) { await this.miniblocks.bulkPut( miniblocks.map((mb) => { return { streamId: streamId, miniblockNum: mb.header.miniblockNum.toString(), - data: parsedMiniblockToPersistedMiniblock(mb).toBinary(), + data: parsedMiniblockToPersistedMiniblock(mb, direction).toBinary(), } }), ) @@ -262,7 +270,11 @@ export class StubPersistenceStore implements IPersistenceStore { return Promise.resolve() } - async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) { + async saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ) { return Promise.resolve() } diff --git a/packages/sdk/src/streamUtils.ts b/packages/sdk/src/streamUtils.ts index f9aabb581..3c70122ee 100644 --- a/packages/sdk/src/streamUtils.ts +++ b/packages/sdk/src/streamUtils.ts @@ -6,7 +6,53 @@ import { } from '@river-build/proto' import { ParsedEvent, ParsedMiniblock } from './types' import { bin_toHexString } from '@river-build/dlog' -import { isDefined } from './check' +import { isDefined, logNever } from './check' + +export function isPersistedEvent(event: ParsedEvent, direction: 'forward' | 'backward'): boolean { + if (!event.event) { + return false + } + + switch (event.event.payload.case) { + case 'channelPayload': + return true + case 'dmChannelPayload': + return true + case 'gdmChannelPayload': + return true + case 'mediaPayload': + return true + case 'userPayload': + return direction === 'forward' ? true : false + case 'userSettingsPayload': + return direction === 'forward' ? true : false + case 'miniblockHeader': + return true + case 'userMetadataPayload': + return direction === 'forward' ? true : false + case 'memberPayload': { + switch (event.event.payload.value.content.case) { + case 'keySolicitation': + return direction === 'forward' ? true : false + case 'keyFulfillment': + return direction === 'forward' ? true : false + case undefined: + return false + default: + return direction === 'forward' ? true : false + } + } + case 'spacePayload': + return direction === 'forward' ? true : false + case 'userInboxPayload': + return direction === 'forward' ? true : false + case undefined: + return false + default: + logNever(event.event.payload, `unsupported event payload ${event.event.payload}`) + return false + } +} export function persistedEventToParsedEvent(event: PersistedEvent): ParsedEvent | undefined { if (!event.event) { @@ -36,11 +82,16 @@ export function persistedMiniblockToParsedMiniblock( } } -export function parsedMiniblockToPersistedMiniblock(miniblock: ParsedMiniblock) { +export function parsedMiniblockToPersistedMiniblock( + miniblock: ParsedMiniblock, + direction: 'forward' | 'backward', +) { return new PersistedMiniblock({ hash: miniblock.hash, header: miniblock.header, - events: miniblock.events.map(parsedEventToPersistedEvent), + events: miniblock.events + .filter((event) => isPersistedEvent(event, direction)) + .map(parsedEventToPersistedEvent), }) } diff --git a/packages/sdk/src/syncedStream.ts b/packages/sdk/src/syncedStream.ts index d3aae0c92..21569121e 100644 --- a/packages/sdk/src/syncedStream.ts +++ b/packages/sdk/src/syncedStream.ts @@ -115,7 +115,7 @@ export class SyncedStream extends Stream implements ISyncedStream { lastMiniblockNum: miniblocks[miniblocks.length - 1].header.miniblockNum, }) await this.persistenceStore.saveSyncedStream(this.streamId, cachedSyncedStream) - await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks) + await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks, 'forward') this.markUpToDate() }