diff --git a/packages/encryption/src/decryptionExtensions.ts b/packages/encryption/src/decryptionExtensions.ts index 68b491963..0c4897958 100644 --- a/packages/encryption/src/decryptionExtensions.ts +++ b/packages/encryption/src/decryptionExtensions.ts @@ -41,6 +41,11 @@ export type DecryptionEvents = { decryptionExtStatusChanged: (status: DecryptionStatus) => void } +export interface NewGroupSessionItem { + streamId: string + sessions: UserInboxPayload_GroupEncryptionSessions +} + export interface EncryptedContentItem { streamId: string eventId: string @@ -48,17 +53,12 @@ export interface EncryptedContentItem { encryptedData: EncryptedData } -interface DecryptionRetryItem { - streamId: string - event: EncryptedContentItem - retryAt: Date -} - export interface KeySolicitationContent { deviceKey: string fallbackKey: string isNewDevice: boolean sessionIds: string[] + srcEventId: string } export interface KeySolicitationItem { @@ -123,20 +123,20 @@ export abstract class BaseDecryptionExtensions { private _status: DecryptionStatus = DecryptionStatus.initializing private queues = { priorityTasks: new Array<() => Promise>(), - newGroupSession: new Array(), + newGroupSession: new Array(), encryptedContent: new Array(), - decryptionRetries: new Array(), missingKeys: new Array(), keySolicitations: new Array(), } private upToDateStreams = new Set() - private highPriorityStreams = new Set() + private highPriorityStreams: string[] = [] private decryptionFailures: Record> = {} // streamId: sessionId: EncryptedContentItem[] private inProgressTick?: Promise private timeoutId?: NodeJS.Timeout private delayMs: number = 15 private started: boolean = false private emitter: TypedEmitter + private keySolicitationsNeedsSort = false protected _onStopFn?: () => void protected log: { @@ -194,6 +194,10 @@ export abstract class BaseDecryptionExtensions { userId: string, opts?: { skipOnChainValidation: boolean }, ): Promise + public abstract isValidEvent( + streamId: string, + eventId: string, + ): { isValid: boolean; reason?: string } public abstract isUserInboxStreamUpToDate(upToDateStreams: Set): boolean public abstract onDecryptionError(item: EncryptedContentItem, err: DecryptionSessionError): void public abstract sendKeySolicitation(args: KeySolicitationData): Promise @@ -213,7 +217,8 @@ export abstract class BaseDecryptionExtensions { _senderId: string, ): void { this.log.info('enqueueNewGroupSessions', sessions) - this.queues.newGroupSession.push(sessions) + const streamId = bin_toHexString(sessions.streamId) + this.queues.newGroupSession.push({ streamId, sessions }) this.checkStartTicking() } @@ -232,6 +237,38 @@ export abstract class BaseDecryptionExtensions { this.checkStartTicking() } + public enqueueInitKeySolicitations( + streamId: string, + members: { + userId: string + userAddress: Uint8Array + solicitations: KeySolicitationContent[] + }[], + ): void { + this.queues.keySolicitations = this.queues.keySolicitations.filter( + (x) => x.streamId !== streamId, + ) + for (const member of members) { + const { userId: fromUserId, userAddress: fromUserAddress } = member + for (const keySolicitation of member.solicitations) { + if (keySolicitation.deviceKey === this.userDevice.deviceKey) { + continue + } + this.queues.keySolicitations.push({ + streamId, + fromUserId, + fromUserAddress, + solicitation: keySolicitation, + respondAfter: new Date( + Date.now() + this.getRespondDelayMSForKeySolicitation(streamId, fromUserId), + ), + } satisfies KeySolicitationItem) + } + } + this.keySolicitationsNeedsSort = true + this.checkStartTicking() + } + public enqueueKeySolicitation( streamId: string, fromUserId: string, @@ -251,19 +288,16 @@ export abstract class BaseDecryptionExtensions { } if (keySolicitation.sessionIds.length > 0 || keySolicitation.isNewDevice) { this.log.debug('new key solicitation', keySolicitation) - insertSorted( - this.queues.keySolicitations, - { - streamId, - fromUserId, - fromUserAddress, - solicitation: keySolicitation, - respondAfter: new Date( - Date.now() + this.getRespondDelayMSForKeySolicitation(streamId, fromUserId), - ), - } satisfies KeySolicitationItem, - (x) => x.respondAfter, - ) + this.keySolicitationsNeedsSort = true + this.queues.keySolicitations.push({ + streamId, + fromUserId, + fromUserAddress, + solicitation: keySolicitation, + respondAfter: new Date( + Date.now() + this.getRespondDelayMSForKeySolicitation(streamId, fromUserId), + ), + } satisfies KeySolicitationItem) this.checkStartTicking() } else if (index > -1) { this.log.debug('cleared key solicitation', keySolicitation) @@ -403,43 +437,62 @@ export abstract class BaseDecryptionExtensions { return priorityTask() } + // update any new group sessions const session = this.queues.newGroupSession.shift() if (session) { this.setStatus(DecryptionStatus.processingNewGroupSessions) return this.processNewGroupSession(session) } + for (const streamId of [...this.highPriorityStreams, undefined]) { + // + if (streamId && !this.upToDateStreams.has(streamId)) { + continue + } + //console.log('csb:dec streamId', streamId) - const encryptedContent = dequeueHighPriority( - this.queues.encryptedContent, - this.highPriorityStreams, - ) - if (encryptedContent) { - this.setStatus(DecryptionStatus.decryptingEvents) - return this.processEncryptedContentItem(encryptedContent) - } + if (!streamId) { + // respond to key solicitations from yourself + const ownKeySolicitationIndex = this.queues.keySolicitations.findIndex( + (x) => x.fromUserId === this.userId, + ) + if (ownKeySolicitationIndex > -1) { + const solicitation = this.queues.keySolicitations.splice( + ownKeySolicitationIndex, + 1, + )[0] + if (solicitation) { + this.log.debug(' processing own key solicitation') + this.setStatus(DecryptionStatus.respondingToKeyRequests) + return this.processKeySolicitation(solicitation) + } + } + } - const decryptionRetry = dequeueUpToDate( - this.queues.decryptionRetries, - now, - (x) => x.retryAt, - this.upToDateStreams, - ) - if (decryptionRetry) { - this.setStatus(DecryptionStatus.retryingDecryption) - return this.processDecryptionRetry(decryptionRetry) - } + const encryptedContent = streamId + ? dequeueItemWithStreamId(this.queues.encryptedContent, streamId) + : this.queues.encryptedContent.shift() + if (encryptedContent) { + this.setStatus(DecryptionStatus.decryptingEvents) + return this.processEncryptedContentItem(encryptedContent) + } - const missingKeys = dequeueUpToDate( - this.queues.missingKeys, - now, - (x) => x.waitUntil, - this.upToDateStreams, - ) - if (missingKeys) { - this.setStatus(DecryptionStatus.requestingKeys) - return this.processMissingKeys(missingKeys) + const missingKey = streamId + ? dequeueItemWithStreamId(this.queues.missingKeys, streamId) + : dequeueUpToDate( + this.queues.missingKeys, + now, + (x) => x.waitUntil, + this.upToDateStreams, + ) + if (missingKey) { + this.setStatus(DecryptionStatus.requestingKeys) + return this.processMissingKeys(missingKey) + } } + if (this.keySolicitationsNeedsSort) { + this.sortKeySolicitations() + } const keySolicitation = dequeueUpToDate( this.queues.keySolicitations, now, @@ -460,17 +513,15 @@ export abstract class BaseDecryptionExtensions { * process new group sessions that were sent to our to device stream inbox * re-enqueue any decryption failures with matching session id */ - private async processNewGroupSession( - session: UserInboxPayload_GroupEncryptionSessions, - ): Promise { - this.log.debug('processNewGroupSession', session) + private async processNewGroupSession(sessionItem: NewGroupSessionItem): Promise { + const { streamId, sessions: session } = sessionItem // check if this message is to our device const ciphertext = session.ciphertexts[this.userDevice.deviceKey] if (!ciphertext) { this.log.debug('skipping, no session for our device') return } - const streamId = bin_toHexString(session.streamId) + this.log.debug('processNewGroupSession', session) // check if it contains any keys we need const neededKeyIndexs = [] for (let i = 0; i < session.sessionIds.length; i++) { @@ -530,51 +581,12 @@ export abstract class BaseDecryptionExtensions { */ private async processEncryptedContentItem(item: EncryptedContentItem): Promise { this.log.debug('processEncryptedContentItem', item) - try { - // do the work to decrypt the event - this.log.debug('decrypting content') - await this.decryptGroupEvent(item.streamId, item.eventId, item.kind, item.encryptedData) - } catch (err: unknown) { - const sessionNotFound = isSessionNotFoundError(err) - this.log.debug('failed to decrypt', err, 'sessionNotFound', sessionNotFound) - - // If !sessionNotFound, we want to know more about this error. - if (!sessionNotFound) { - this.log.info('failed to decrypt', err, 'streamId', item.streamId) - } - - this.onDecryptionError(item, { - missingSession: sessionNotFound, - kind: item.kind, - encryptedData: item.encryptedData, - error: err, - }) - - insertSorted( - this.queues.decryptionRetries, - { - streamId: item.streamId, - event: item, - retryAt: new Date(Date.now() + 3000), // give it 3 seconds, maybe someone will send us the key - }, - (x) => x.retryAt, - ) - } - } - - /** - * processDecryptionRetry - * retry decryption a second time for a failed decryption, keys may have arrived - */ - private async processDecryptionRetry(retryItem: DecryptionRetryItem): Promise { - const item = retryItem.event try { this.log.debug('retrying decryption', item) await this.decryptGroupEvent(item.streamId, item.eventId, item.kind, item.encryptedData) } catch (err) { const sessionNotFound = isSessionNotFoundError(err) - this.log.info('failed to decrypt on retry', err, 'sessionNotFound', sessionNotFound) this.onDecryptionError(item, { missingSession: sessionNotFound, kind: item.kind, @@ -598,6 +610,8 @@ export abstract class BaseDecryptionExtensions { { streamId, waitUntil: new Date(Date.now() + 1000) }, (x) => x.waitUntil, ) + } else { + this.log.info('failed to decrypt', err, 'streamId', item.streamId) } } } @@ -675,6 +689,16 @@ export abstract class BaseDecryptionExtensions { const knownSessionIds = (await this.crypto.encryptionDevice.getInboundGroupSessionIds(streamId)) ?? [] + const { isValid, reason } = this.isValidEvent(streamId, item.solicitation.srcEventId) + if (!isValid) { + this.log.error('processing key solicitation: invalid event id', { + streamId, + eventId: item.solicitation.srcEventId, + reason, + }) + return + } + knownSessionIds.sort() const requestedSessionIds = new Set(item.solicitation.sessionIds.sort()) const replySessionIds = item.solicitation.isNewDevice @@ -743,7 +767,14 @@ export abstract class BaseDecryptionExtensions { } public setHighPriorityStreams(streamIds: string[]) { - this.highPriorityStreams = new Set(streamIds) + this.highPriorityStreams = streamIds + } + + private sortKeySolicitations() { + this.queues.keySolicitations.sort( + (a, b) => a.respondAfter.getTime() - b.respondAfter.getTime(), + ) + this.keySolicitationsNeedsSort = false } } @@ -793,13 +824,13 @@ function dequeueUpToDate( return items.splice(index, 1)[0] } -function dequeueHighPriority( +function dequeueItemWithStreamId( items: T[], - highPriorityIds: Set, + streamId: string, ): T | undefined { - const index = items.findIndex((x) => highPriorityIds.has(x.streamId)) + const index = items.findIndex((x) => x.streamId === streamId) if (index === -1) { - return items.shift() + return undefined } return items.splice(index, 1)[0] } diff --git a/packages/encryption/src/tests/decryptionExtensions.test.ts b/packages/encryption/src/tests/decryptionExtensions.test.ts index f77d3b3da..cc1bf9622 100644 --- a/packages/encryption/src/tests/decryptionExtensions.test.ts +++ b/packages/encryption/src/tests/decryptionExtensions.test.ts @@ -61,6 +61,7 @@ describe('TestDecryptionExtensions', () => { fallbackKey: aliceDex.userDevice.fallbackKey, isNewDevice: true, sessionIds: [sessionId], + srcEventId: '', } const keySolicitation = aliceClient.sendKeySolicitation(keySolicitationData) // pretend bob receives a key solicitation request from alice, and starts processing it. @@ -286,6 +287,11 @@ class MockDecryptionExtensions extends BaseDecryptionExtensions { return this._upToDateStreams.has(streamId) } + public isValidEvent(streamId: string, eventId: string): { isValid: boolean; reason?: string } { + log('isValidEvent', streamId, eventId) + return { isValid: true } + } + public decryptGroupEvent( _streamId: string, _eventId: string, diff --git a/packages/proto/internal.proto b/packages/proto/internal.proto index 06586466f..a530dc445 100644 --- a/packages/proto/internal.proto +++ b/packages/proto/internal.proto @@ -9,6 +9,7 @@ message PersistedEvent { bytes hash = 2; string prev_miniblock_hash_str = 3; string creator_user_id = 4; + bytes signature = 5; } message PersistedMiniblock { diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 34f791184..b2e8d8939 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -83,6 +83,7 @@ import { isUserId, } from './id' import { + checkEventSignature, makeEvent, UnpackEnvelopeOpts, unpackMiniblock, @@ -192,6 +193,7 @@ export class Client private decryptionExtensions?: BaseDecryptionExtensions private syncedStreamsExtensions?: SyncedStreamsExtension private persistenceStore: IPersistenceStore + private validatedEvents: Record = {} constructor( signerContext: SignerContext, @@ -304,6 +306,45 @@ export class Client return stream } + isValidEvent(streamId: string, eventId: string): { isValid: boolean; reason?: string } { + // if we didn't disable signature validation, we can assume the event is valid + if (this.unpackEnvelopeOpts?.disableSignatureValidation !== true) { + return { isValid: true } + } + const stream = this.stream(streamId) + if (!stream) { + return { isValid: false, reason: 'stream not found' } + } + const event = stream.view.events.get(eventId) + if (!event) { + return { isValid: false, reason: 'event not found' } + } + if (!event.remoteEvent) { + return { isValid: false, reason: 'remote event not found' } + } + if (!event.remoteEvent.signature) { + return { isValid: false, reason: 'remote event signature not found' } + } + if (this.validatedEvents[eventId]) { + return this.validatedEvents[eventId] + } + try { + checkEventSignature( + event.remoteEvent.event, + event.remoteEvent.hash, + event.remoteEvent.signature, + ) + const result = { isValid: true } + this.validatedEvents[eventId] = result + return result + } catch (err) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + const result = { isValid: false, reason: `error: ${err}` } + this.validatedEvents[eventId] = result + return result + } + } + private async initUserJoinedStreams() { assert(isDefined(this.userStreamId), 'userStreamId must be set') assert(isDefined(this.syncedStreamsExtensions), 'syncedStreamsExtensions must be set') @@ -1840,7 +1881,11 @@ export class Client const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts) unpackedMiniblocks.push(unpackedMiniblock) } - await this.persistenceStore.saveMiniblocks(streamIdAsString(streamId), unpackedMiniblocks) + await this.persistenceStore.saveMiniblocks( + streamIdAsString(streamId), + unpackedMiniblocks, + 'backward', + ) return { terminus: response.terminus, miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks], diff --git a/packages/sdk/src/clientDecryptionExtensions.ts b/packages/sdk/src/clientDecryptionExtensions.ts index b89657d1d..b51f8daa2 100644 --- a/packages/sdk/src/clientDecryptionExtensions.ts +++ b/packages/sdk/src/clientDecryptionExtensions.ts @@ -70,11 +70,21 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { keySolicitation: KeySolicitationContent, ) => this.enqueueKeySolicitation(streamId, fromUserId, fromUserAddress, keySolicitation) + const onInitKeySolicitations = ( + streamId: string, + members: { + userId: string + userAddress: Uint8Array + solicitations: KeySolicitationContent[] + }[], + ) => this.enqueueInitKeySolicitations(streamId, members) + client.on('streamUpToDate', onStreamUpToDate) client.on('newGroupSessions', onNewGroupSessions) client.on('newEncryptedContent', onNewEncryptedContent) client.on('newKeySolicitation', onKeySolicitation) client.on('updatedKeySolicitation', onKeySolicitation) + client.on('initKeySolicitations', onInitKeySolicitations) client.on('streamNewUserJoined', onMembershipChange) this._onStopFn = () => { @@ -83,6 +93,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { client.off('newEncryptedContent', onNewEncryptedContent) client.off('newKeySolicitation', onKeySolicitation) client.off('updatedKeySolicitation', onKeySolicitation) + client.off('initKeySolicitations', onInitKeySolicitations) client.off('streamNewUserJoined', onMembershipChange) } this.log.debug('new ClientDecryptionExtensions', { userDevice }) @@ -130,7 +141,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { const multiplier = userId === this.userId ? 0.5 : 1 const stream = this.client.stream(streamId) check(isDefined(stream), 'stream not found') - const numMembers = stream.view.getMembers().participants().size + const numMembers = stream.view.getMembers().joinedParticipants().size const maxWaitTimeSeconds = Math.max(5, Math.min(30, numMembers)) const waitTime = maxWaitTimeSeconds * 1000 * Math.random() // this could be much better this.log.debug('getRespondDelayMSForKeySolicitation', { streamId, userId, waitTime }) @@ -179,6 +190,10 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { return true } + public isValidEvent(streamId: string, eventId: string): { isValid: boolean; reason?: string } { + return this.client.isValidEvent(streamId, eventId) + } + public onDecryptionError(item: EncryptedContentItem, err: DecryptionSessionError): void { this.client.stream(item.streamId)?.updateDecryptedContentError(item.eventId, { missingSession: err.missingSession, diff --git a/packages/sdk/src/persistenceStore.ts b/packages/sdk/src/persistenceStore.ts index fa536c745..868f0e28b 100644 --- a/packages/sdk/src/persistenceStore.ts +++ b/packages/sdk/src/persistenceStore.ts @@ -78,7 +78,11 @@ export interface IPersistenceStore { > saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream): Promise saveMiniblock(streamId: string, miniblock: ParsedMiniblock): Promise - saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]): Promise + saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ): Promise getMiniblock(streamId: string, miniblockNum: bigint): Promise getMiniblocks( streamId: string, @@ -147,7 +151,7 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { async saveMiniblock(streamId: string, miniblock: ParsedMiniblock) { log('saving miniblock', streamId) - const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock) + const cachedMiniblock = parsedMiniblockToPersistedMiniblock(miniblock, 'forward') await this.miniblocks.put({ streamId: streamId, miniblockNum: miniblock.header.miniblockNum.toString(), @@ -155,13 +159,17 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { }) } - async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) { + async saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ) { await this.miniblocks.bulkPut( miniblocks.map((mb) => { return { streamId: streamId, miniblockNum: mb.header.miniblockNum.toString(), - data: parsedMiniblockToPersistedMiniblock(mb).toBinary(), + data: parsedMiniblockToPersistedMiniblock(mb, direction).toBinary(), } }), ) @@ -262,7 +270,11 @@ export class StubPersistenceStore implements IPersistenceStore { return Promise.resolve() } - async saveMiniblocks(streamId: string, miniblocks: ParsedMiniblock[]) { + async saveMiniblocks( + streamId: string, + miniblocks: ParsedMiniblock[], + direction: 'forward' | 'backward', + ) { return Promise.resolve() } diff --git a/packages/sdk/src/sign.ts b/packages/sdk/src/sign.ts index 1b9ec4794..a9e41244f 100644 --- a/packages/sdk/src/sign.ts +++ b/packages/sdk/src/sign.ts @@ -189,47 +189,56 @@ export const unpackEnvelope = async ( const event = StreamEvent.fromBinary(envelope.event) let hash = envelope.hash - const checkEventHash = opts?.disableHashValidation !== true - if (checkEventHash) { + const doCheckEventHash = opts?.disableHashValidation !== true + if (doCheckEventHash) { hash = riverHash(envelope.event) check(bin_equal(hash, envelope.hash), 'Event id is not valid', Err.BAD_EVENT_ID) } - const checkEventSignature = opts?.disableSignatureValidation !== true - if (checkEventSignature) { - const recoveredPubKey = riverRecoverPubKey(hash, envelope.signature) - - if (!hasElements(event.delegateSig)) { - const address = publicKeyToAddress(recoveredPubKey) - check( - bin_equal(address, event.creatorAddress), - 'Event signature is not valid', - Err.BAD_EVENT_SIGNATURE, - ) - } else { - checkDelegateSig({ - delegatePubKey: recoveredPubKey, - creatorAddress: event.creatorAddress, - delegateSig: event.delegateSig, - expiryEpochMs: event.delegateExpiryEpochMs, - }) - } + const doCheckEventSignature = opts?.disableSignatureValidation !== true + if (doCheckEventSignature) { + checkEventSignature(event, hash, envelope.signature) } - return makeParsedEvent(event, envelope.hash) + return makeParsedEvent(event, envelope.hash, envelope.signature) } -export function makeParsedEvent(event: StreamEvent, hash?: Uint8Array) { +export function checkEventSignature(event: StreamEvent, hash: Uint8Array, signature: Uint8Array) { + const recoveredPubKey = riverRecoverPubKey(hash, signature) + + if (!hasElements(event.delegateSig)) { + const address = publicKeyToAddress(recoveredPubKey) + check( + bin_equal(address, event.creatorAddress), + 'Event signature is not valid', + Err.BAD_EVENT_SIGNATURE, + ) + } else { + checkDelegateSig({ + delegatePubKey: recoveredPubKey, + creatorAddress: event.creatorAddress, + delegateSig: event.delegateSig, + expiryEpochMs: event.delegateExpiryEpochMs, + }) + } +} + +export function makeParsedEvent( + event: StreamEvent, + hash: Uint8Array | undefined, + signature: Uint8Array | undefined, +) { hash = hash ?? riverHash(event.toBinary()) return { event, hash, hashStr: bin_toHexString(hash), + signature, prevMiniblockHashStr: event.prevMiniblockHash ? bin_toHexString(event.prevMiniblockHash) : undefined, creatorUserId: userIdFromAddress(event.creatorAddress), - } + } satisfies ParsedEvent } export const unpackEnvelopes = async ( diff --git a/packages/sdk/src/streamEvents.ts b/packages/sdk/src/streamEvents.ts index 2fe64944f..12691ec87 100644 --- a/packages/sdk/src/streamEvents.ts +++ b/packages/sdk/src/streamEvents.ts @@ -43,6 +43,14 @@ export type StreamEncryptionEvents = { fromUserAddress: Uint8Array, event: KeySolicitationContent, ) => void + initKeySolicitations: ( + streamId: string, + members: { + userId: string + userAddress: Uint8Array + solicitations: KeySolicitationContent[] + }[], + ) => void userDeviceKeyMessage: (streamId: string, userId: string, userDevice: UserDevice) => void } diff --git a/packages/sdk/src/streamStateView.ts b/packages/sdk/src/streamStateView.ts index 1f5f1956e..b4b7ddbcd 100644 --- a/packages/sdk/src/streamStateView.ts +++ b/packages/sdk/src/streamStateView.ts @@ -293,7 +293,7 @@ export class StreamStateView implements IStreamStateView { default: logNever(snapshot.content) } - this.membershipContent.applySnapshot(snapshot, cleartexts, encryptionEmitter) + this.membershipContent.applySnapshot(eventHash, snapshot, cleartexts, encryptionEmitter) } private appendStreamAndCookie( diff --git a/packages/sdk/src/streamStateView_Members.ts b/packages/sdk/src/streamStateView_Members.ts index 7dc430223..9203457ec 100644 --- a/packages/sdk/src/streamStateView_Members.ts +++ b/packages/sdk/src/streamStateView_Members.ts @@ -59,6 +59,7 @@ export class StreamStateView_Members extends StreamStateView_AbstractContent { // initialization applySnapshot( + eventId: string, snapshot: Snapshot, cleartexts: Record | undefined, encryptionEmitter: TypedEmitter | undefined, @@ -80,6 +81,7 @@ export class StreamStateView_Members extends StreamStateView_AbstractContent { fallbackKey: s.fallbackKey, isNewDevice: s.isNewDevice, sessionIds: [...s.sessionIds], + srcEventId: eventId, } satisfies KeySolicitationContent), ), encryptedUsername: member.username, @@ -132,7 +134,11 @@ export class StreamStateView_Members extends StreamStateView_AbstractContent { snapshot.members?.pins.forEach((snappedPin) => { if (snappedPin.pin?.event) { - const parsedEvent = makeParsedEvent(snappedPin.pin.event, snappedPin.pin.eventId) + const parsedEvent = makeParsedEvent( + snappedPin.pin.event, + snappedPin.pin.eventId, + undefined, + ) const remoteEvent = makeRemoteTimelineEvent({ parsedEvent, eventNum: 0n }) const cleartext = cleartexts?.[remoteEvent.hashStr] this.addPin( @@ -204,6 +210,7 @@ export class StreamStateView_Members extends StreamStateView_AbstractContent { check(isDefined(stateMember), 'key solicitation from non-member') this.solicitHelper.applySolicitation( stateMember, + event.hashStr, payload.content.value, encryptionEmitter, ) @@ -281,7 +288,7 @@ export class StreamStateView_Members extends StreamStateView_AbstractContent { { const pin = payload.content.value check(isDefined(pin.event), 'invalid pin event') - const parsedEvent = makeParsedEvent(pin.event, pin.eventId) + const parsedEvent = makeParsedEvent(pin.event, pin.eventId, undefined) const remoteEvent = makeRemoteTimelineEvent({ parsedEvent, eventNum: 0n }) this.addPin( event.creatorUserId, diff --git a/packages/sdk/src/streamStateView_Members_Solicitations.ts b/packages/sdk/src/streamStateView_Members_Solicitations.ts index f5f53492a..6fbebf96a 100644 --- a/packages/sdk/src/streamStateView_Members_Solicitations.ts +++ b/packages/sdk/src/streamStateView_Members_Solicitations.ts @@ -3,6 +3,7 @@ import { MemberPayload_KeyFulfillment, MemberPayload_KeySolicitation } from '@ri import { StreamEncryptionEvents } from './streamEvents' import { StreamMember } from './streamStateView_Members' import { removeCommon } from './utils' +import { KeySolicitationContent } from '@river-build/encryption' export class StreamStateView_Members_Solicitations { constructor(readonly streamId: string) {} @@ -11,40 +12,40 @@ export class StreamStateView_Members_Solicitations { members: StreamMember[], encryptionEmitter: TypedEmitter | undefined, ): void { - for (const member of members) { - for (const event of member.solicitations) { - encryptionEmitter?.emit( - 'newKeySolicitation', - this.streamId, - member.userId, - member.userAddress, - event, - ) - } - } + encryptionEmitter?.emit( + 'initKeySolicitations', + this.streamId, + members.map((member) => ({ + userId: member.userId, + userAddress: member.userAddress, + solicitations: member.solicitations, + })), + ) } applySolicitation( user: StreamMember, + eventId: string, solicitation: MemberPayload_KeySolicitation, encryptionEmitter: TypedEmitter | undefined, ): void { user.solicitations = user.solicitations.filter( (x) => x.deviceKey !== solicitation.deviceKey, ) - user.solicitations.push({ + const newSolicitation = { deviceKey: solicitation.deviceKey, fallbackKey: solicitation.fallbackKey, isNewDevice: solicitation.isNewDevice, - sessionIds: [...solicitation.sessionIds.sort()], - }) - + sessionIds: solicitation.sessionIds.toSorted(), + srcEventId: eventId, + } satisfies KeySolicitationContent + user.solicitations.push(newSolicitation) encryptionEmitter?.emit( 'newKeySolicitation', this.streamId, user.userId, user.userAddress, - solicitation, + newSolicitation, ) } @@ -62,8 +63,9 @@ export class StreamStateView_Members_Solicitations { deviceKey: prev.deviceKey, fallbackKey: prev.fallbackKey, isNewDevice: false, - sessionIds: [...removeCommon(prev.sessionIds, fulfillment.sessionIds.sort())], - } + sessionIds: [...removeCommon(prev.sessionIds, fulfillment.sessionIds.toSorted())], + srcEventId: prev.srcEventId, + } satisfies KeySolicitationContent user.solicitations[index] = newEvent encryptionEmitter?.emit( 'updatedKeySolicitation', diff --git a/packages/sdk/src/streamUtils.ts b/packages/sdk/src/streamUtils.ts index 76b20abc5..3c70122ee 100644 --- a/packages/sdk/src/streamUtils.ts +++ b/packages/sdk/src/streamUtils.ts @@ -6,7 +6,53 @@ import { } from '@river-build/proto' import { ParsedEvent, ParsedMiniblock } from './types' import { bin_toHexString } from '@river-build/dlog' -import { isDefined } from './check' +import { isDefined, logNever } from './check' + +export function isPersistedEvent(event: ParsedEvent, direction: 'forward' | 'backward'): boolean { + if (!event.event) { + return false + } + + switch (event.event.payload.case) { + case 'channelPayload': + return true + case 'dmChannelPayload': + return true + case 'gdmChannelPayload': + return true + case 'mediaPayload': + return true + case 'userPayload': + return direction === 'forward' ? true : false + case 'userSettingsPayload': + return direction === 'forward' ? true : false + case 'miniblockHeader': + return true + case 'userMetadataPayload': + return direction === 'forward' ? true : false + case 'memberPayload': { + switch (event.event.payload.value.content.case) { + case 'keySolicitation': + return direction === 'forward' ? true : false + case 'keyFulfillment': + return direction === 'forward' ? true : false + case undefined: + return false + default: + return direction === 'forward' ? true : false + } + } + case 'spacePayload': + return direction === 'forward' ? true : false + case 'userInboxPayload': + return direction === 'forward' ? true : false + case undefined: + return false + default: + logNever(event.event.payload, `unsupported event payload ${event.event.payload}`) + return false + } +} export function persistedEventToParsedEvent(event: PersistedEvent): ParsedEvent | undefined { if (!event.event) { @@ -16,6 +62,7 @@ export function persistedEventToParsedEvent(event: PersistedEvent): ParsedEvent event: event.event, hash: event.hash, hashStr: bin_toHexString(event.hash), + signature: event.signature, prevMiniblockHashStr: event.prevMiniblockHashStr.length > 0 ? event.prevMiniblockHashStr : undefined, creatorUserId: event.creatorUserId, @@ -35,11 +82,16 @@ export function persistedMiniblockToParsedMiniblock( } } -export function parsedMiniblockToPersistedMiniblock(miniblock: ParsedMiniblock) { +export function parsedMiniblockToPersistedMiniblock( + miniblock: ParsedMiniblock, + direction: 'forward' | 'backward', +) { return new PersistedMiniblock({ hash: miniblock.hash, header: miniblock.header, - events: miniblock.events.map(parsedEventToPersistedEvent), + events: miniblock.events + .filter((event) => isPersistedEvent(event, direction)) + .map(parsedEventToPersistedEvent), }) } @@ -47,6 +99,7 @@ function parsedEventToPersistedEvent(event: ParsedEvent) { return new PersistedEvent({ event: event.event, hash: event.hash, + signature: event.signature, prevMiniblockHashStr: event.prevMiniblockHashStr, creatorUserId: event.creatorUserId, }) diff --git a/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts b/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts index 62bb9761e..d122386da 100644 --- a/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts +++ b/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts @@ -3,6 +3,7 @@ import { Client, ClientEvents } from '../../../client' import { StreamRpcClient } from '../../../makeStreamRpcClient' import { SignerContext } from '../../../signerContext' import { Store } from '../../../store/store' +import { UnpackEnvelopeOpts } from '../../../sign' export class TransactionalClient extends Client { store: Store @@ -15,6 +16,7 @@ export class TransactionalClient extends Client { persistenceStoreName?: string, logNamespaceFilter?: string, highPriorityStreamIds?: string[], + unpackEnvelopeOpts?: UnpackEnvelopeOpts, ) { super( signerContext, @@ -24,6 +26,7 @@ export class TransactionalClient extends Client { persistenceStoreName, logNamespaceFilter, highPriorityStreamIds, + unpackEnvelopeOpts, ) this.store = store } diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts index 3651ab984..934aa5977 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts @@ -19,6 +19,7 @@ import { AuthStatus } from './models/authStatus' import { RetryParams } from '../../rpcInterceptors' import { Stream } from '../../stream' import { isDefined } from '../../check' +import { UnpackEnvelopeOpts } from '../../sign' const logger = dlogger('csb:riverConnection') @@ -31,6 +32,7 @@ export interface ClientParams { highPriorityStreamIds?: string[] rpcRetryParams?: RetryParams encryptionDevice?: EncryptionDeviceInitOpts + unpackEnvelopeOpts?: UnpackEnvelopeOpts } export type OnStoppedFn = () => void @@ -62,7 +64,7 @@ export class RiverConnection extends PersistedObservable { public spaceDapp: SpaceDapp, public riverRegistryDapp: RiverRegistry, private makeRpcClient: MakeRpcClientType, - private clientParams: ClientParams, + public clientParams: ClientParams, ) { super({ id: '0', userExists: false }, store, LoadPriority.high) this.riverChain = new RiverChain(store, riverRegistryDapp, this.userId) @@ -160,6 +162,7 @@ export class RiverConnection extends PersistedObservable { this.clientParams.persistenceStoreName, this.clientParams.logNamespaceFilter, this.clientParams.highPriorityStreamIds, + this.clientParams.unpackEnvelopeOpts, ) client.setMaxListeners(100) this.client = client diff --git a/packages/sdk/src/sync-agent/spaces/models/channel.ts b/packages/sdk/src/sync-agent/spaces/models/channel.ts index ec90c3630..68ef01a8e 100644 --- a/packages/sdk/src/sync-agent/spaces/models/channel.ts +++ b/packages/sdk/src/sync-agent/spaces/models/channel.ts @@ -67,6 +67,13 @@ export class Channel extends PersistedObservable { } } + async join() { + const channelId = this.data.id + await this.riverConnection.call(async (client) => { + await client.joinStream(channelId) + }) + } + async sendMessage( message: string, options?: { diff --git a/packages/sdk/src/sync-agent/syncAgents.test.ts b/packages/sdk/src/sync-agent/syncAgents.test.ts index c9853ea63..1b24ec5a6 100644 --- a/packages/sdk/src/sync-agent/syncAgents.test.ts +++ b/packages/sdk/src/sync-agent/syncAgents.test.ts @@ -86,6 +86,41 @@ describe('syncAgents.test.ts', () => { ) }) + test('syncAgents send a message with disableSignatureValidation=true', async () => { + const prevBobOpts = bob.riverConnection.clientParams.unpackEnvelopeOpts + const prevAliceOpts = alice.riverConnection.clientParams.unpackEnvelopeOpts + bob.riverConnection.clientParams.unpackEnvelopeOpts = { + disableSignatureValidation: true, + } + alice.riverConnection.clientParams.unpackEnvelopeOpts = { + disableSignatureValidation: true, + } + await Promise.all([bob.start(), alice.start()]) + await waitFor(() => bob.spaces.value.status === 'loaded') + const spaceId = bob.spaces.data.spaceIds[0] + const space = bob.spaces.getSpace(spaceId) + const channelId = await space.createChannel('random', bobUser.signer) + const channel = space.getChannel(channelId) + await channel.sendMessage('Hello, World again!') + + // join the channel, find the message + const aliceChannel = alice.spaces.getSpace(spaceId).getChannel(channel.data.id) + await aliceChannel.join() + logger.log(aliceChannel.timeline.events.value) + await waitFor( + () => + expect( + aliceChannel.timeline.events.value.find( + (e) => e.text === 'Hello, World again!', + ), + ).toBeDefined(), + { timeoutMS: 10000 }, + ) + // reset the unpackEnvelopeOpts + bob.riverConnection.clientParams.unpackEnvelopeOpts = prevBobOpts + alice.riverConnection.clientParams.unpackEnvelopeOpts = prevAliceOpts + }) + test('syncAgents pin a message', async () => { await Promise.all([bob.start(), alice.start()]) await waitFor(() => bob.spaces.value.status === 'loaded') diff --git a/packages/sdk/src/syncedStream.ts b/packages/sdk/src/syncedStream.ts index d3aae0c92..21569121e 100644 --- a/packages/sdk/src/syncedStream.ts +++ b/packages/sdk/src/syncedStream.ts @@ -115,7 +115,7 @@ export class SyncedStream extends Stream implements ISyncedStream { lastMiniblockNum: miniblocks[miniblocks.length - 1].header.miniblockNum, }) await this.persistenceStore.saveSyncedStream(this.streamId, cachedSyncedStream) - await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks) + await this.persistenceStore.saveMiniblocks(this.streamId, miniblocks, 'forward') this.markUpToDate() } diff --git a/packages/sdk/src/syncedStreamsExtension.ts b/packages/sdk/src/syncedStreamsExtension.ts index f232130cc..c0ba01362 100644 --- a/packages/sdk/src/syncedStreamsExtension.ts +++ b/packages/sdk/src/syncedStreamsExtension.ts @@ -39,6 +39,7 @@ export class SyncedStreamsExtension { private startSyncRequested = false private didLoadStreamsFromPersistence = false + private didLoadHighPriorityStreams = false private streamCountRequiringNetworkAccess = 0 private numStreamsLoadedFromCache = 0 private numStreamsLoadedFromNetwork = 0 @@ -47,6 +48,7 @@ export class SyncedStreamsExtension { private loadedStreamCount = 0 initStatus: ClientInitStatus = { + isHighPriorityDataLoaded: false, isLocalDataLoaded: false, isRemoteDataLoaded: false, progress: 0, @@ -132,6 +134,7 @@ export class SyncedStreamsExtension { private async loadHighPriorityStreams() { const streamIds = Array.from(this.highPriorityIds) await Promise.all(streamIds.map((streamId) => this.loadStreamFromPersistence(streamId))) + this.didLoadHighPriorityStreams = true this.emitClientStatus() } @@ -216,6 +219,7 @@ export class SyncedStreamsExtension { } private emitClientStatus() { + this.initStatus.isHighPriorityDataLoaded = this.didLoadHighPriorityStreams this.initStatus.isLocalDataLoaded = this.didLoadStreamsFromPersistence this.initStatus.isRemoteDataLoaded = this.didLoadStreamsFromPersistence && this.streamCountRequiringNetworkAccess === 0 diff --git a/packages/sdk/src/tags.test.ts b/packages/sdk/src/tags.test.ts index a9a96f07f..f4819cbae 100644 --- a/packages/sdk/src/tags.test.ts +++ b/packages/sdk/src/tags.test.ts @@ -83,6 +83,8 @@ describe('makeTags', () => { createdAtEpochMs: BigInt(Date.now()), tags: undefined, }), + undefined, + undefined, ), eventNum: 0n, miniblockNum: 0n, @@ -139,6 +141,8 @@ describe('makeTags', () => { createdAtEpochMs: BigInt(Date.now()), tags: undefined, }), + undefined, + undefined, ), eventNum: 0n, miniblockNum: 0n, @@ -158,6 +162,8 @@ describe('makeTags', () => { createdAtEpochMs: BigInt(Date.now()), tags: undefined, }), + undefined, + undefined, ), eventNum: 0n, miniblockNum: 0n, @@ -196,6 +202,8 @@ describe('makeTags', () => { createdAtEpochMs: BigInt(Date.now()), tags: undefined, }), + undefined, + undefined, ), eventNum: 0n, miniblockNum: 0n, diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index 63a335c6e..dbfec9463 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -55,6 +55,7 @@ export interface ParsedEvent { event: StreamEvent hash: Uint8Array hashStr: string + signature: Uint8Array | undefined prevMiniblockHashStr?: string creatorUserId: string } @@ -154,6 +155,7 @@ export interface ParsedStreamResponse { } export type ClientInitStatus = { + isHighPriorityDataLoaded: boolean isLocalDataLoaded: boolean isRemoteDataLoaded: boolean progress: number