diff --git a/packages/core/src/stream-loading/__tests__/stream-loader.test.ts b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts index 2221ba0c35..9eb9fd0024 100644 --- a/packages/core/src/stream-loading/__tests__/stream-loader.test.ts +++ b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts @@ -21,8 +21,16 @@ import { StreamLoader } from '../stream-loader.js' import { TipFetcher } from '../tip-fetcher.js' import { AnchorTimestampExtractor } from '../anchor-timestamp-extractor.js' import { InMemoryAnchorService } from '../../anchor/memory/in-memory-anchor-service.js' -import { CommitID } from '@ceramicnetwork/streamid' +import { CommitID, StreamID } from '@ceramicnetwork/streamid' import cloneDeep from 'lodash.clonedeep' +import { CID } from 'multiformats/cid' +import { + MsgType, + PubsubMessage, + QueryMessage, + ResponseMessage, +} from '../../pubsub/pubsub-message.js' +import { asIpfsMessage } from '../../pubsub/__tests__/as-ipfs-message.js' const TOPIC = '/ceramic/test12345' const CONTENT0 = { step: 0 } @@ -42,7 +50,7 @@ function expectStatesEqualWithPendingAnchor( ) } -describe('StreamLoader test', () => { +describe('StreamLoader querying against real Ceramic node', () => { jest.setTimeout(1000 * 30) let dispatcher: Dispatcher @@ -59,7 +67,8 @@ describe('StreamLoader test', () => { dispatcherIpfs = await createIPFS() dispatcher = await createDispatcher(dispatcherIpfs, TOPIC) // speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs. - dispatcher._ipfsTimeout = 1000 + dispatcher._ipfsTimeout = 100 + await dispatcher.init() const logger = new LoggerProvider().getDiagnosticsLogger() const tipFetcher = new TipFetcher(dispatcher.messageBus) @@ -248,3 +257,324 @@ describe('StreamLoader test', () => { }) }) }) + +/** + * Intercepts the function that publishes messages to pubsub and returns the first query message + * published (while still passing it on to pubsub as normal). + */ +function getQueryPublishedPromise( + dispatcher: Dispatcher, + streamID: StreamID, + originalPubsubPublish +): Promise { + const pubsubPublishSpy = jest.spyOn(dispatcher.messageBus.pubsub, 'next') + return new Promise((resolve) => { + pubsubPublishSpy.mockImplementation((message: PubsubMessage) => { + if (message.typ == MsgType.QUERY && message.stream.equals(streamID)) { + resolve(message) + } + return originalPubsubPublish(message) + }) + }) +} + +function makeResponse(streamID: StreamID, queryId: string, cid: CID): ResponseMessage { + const tipMap = new Map().set(streamID.toString(), cid) + const response = { typ: MsgType.RESPONSE as const, id: queryId, tips: tipMap } + return response +} + +describe('StreamLoader querying against mocked pubsub responses', () => { + jest.setTimeout(1000 * 30) + + let ipfs: IpfsApi + + let dispatcher: Dispatcher + let streamLoader: StreamLoader + + let stream: TileDocument + let commitCids: Array + const states: Array = [] + + let receiveMessage + let originalPubsubPublish + + beforeAll(async () => { + ipfs = await createIPFS() + const ceramic = await createCeramic(ipfs, { pubsubTopic: TOPIC }) + + dispatcher = await createDispatcher(ipfs, TOPIC) + // speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs. + dispatcher._ipfsTimeout = 100 + + // Intercept the function passed to ipfs on pubsub subscription so that we can publish new + // messages directly + const originalPubsubSubscribe = ipfs.pubsub.subscribe.bind(ipfs.pubsub) + const pubsubSubscribeSpy = jest.spyOn(ipfs.pubsub, 'subscribe') + pubsubSubscribeSpy.mockImplementation(async (topic, onMessageCallback, opts) => { + receiveMessage = onMessageCallback + return originalPubsubSubscribe(topic, onMessageCallback, opts) + }) + originalPubsubPublish = dispatcher.messageBus.pubsub.next.bind(dispatcher.messageBus.pubsub) + + const logger = new LoggerProvider().getDiagnosticsLogger() + const tipFetcher = new TipFetcher(dispatcher.messageBus) + const logSyncer = new LogSyncer(dispatcher) + const anchorTimestampExtractor = new AnchorTimestampExtractor( + logger, + dispatcher, + ceramic.context.anchorService as InMemoryAnchorService + ) + const handlers = new HandlersMap(logger) + const stateManipulator = new StateManipulator( + logger, + handlers, + { did: ceramic.did, api: ceramic } as Context, + logSyncer + ) + streamLoader = new StreamLoader( + logger, + tipFetcher, + logSyncer, + anchorTimestampExtractor, + stateManipulator + ) + + stream = await TileDocument.create(ceramic, CONTENT0) + states.push(stream.state) + await TestUtils.anchorUpdate(ceramic, stream) + states.push(stream.state) + await stream.update(CONTENT1) + states.push(stream.state) + await stream.update(CONTENT2) + states.push(stream.state) + await TestUtils.anchorUpdate(ceramic, stream) + states.push(stream.state) + commitCids = stream.allCommitIds.map((commitId) => commitId.commit) + expect(commitCids.length).toEqual(5) + // When we load the state from the network we won't ever see a pending anchor status. + states.forEach((state) => { + if (state.anchorStatus == AnchorStatus.PENDING) { + state.anchorStatus = AnchorStatus.NOT_REQUESTED + } + }) + expect(states[0].log.length).toEqual(1) + expect(states[1].log.length).toEqual(2) + expect(states[2].log.length).toEqual(3) + expect(states[3].log.length).toEqual(4) + expect(states[4].log.length).toEqual(5) + + // Close the Ceramic node before tests start so it won't respond to any pubsub messages. + await ceramic.close() + }) + + afterAll(async () => { + await dispatcher.close() + + // Wait for pubsub unsubscribe to be processed + // TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe + // has been processed + await TestUtils.delay(5000) + + await ipfs.stop() + }) + + describe('loadStream', () => { + test('Invalid tip', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const loadStreamPromise = streamLoader.loadStream(stream.id, 1) + + const queryMessage = await queryPublished + + const invalidTip = TestUtils.randomCID() + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip))) + + const loaded = await loadStreamPromise + + // If no valid tip is received, should return genesis state + expect(loaded.log.length).toEqual(1) + expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + expect(loaded.content).toEqual(CONTENT0) + }) + + test('Considers multiple tips - second tip better', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const loadStreamPromise = streamLoader.loadStream(stream.id, 1) + + const queryMessage = await queryPublished + + // publish 2 tips back-to-back. The loadStream should consider both tips + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + + const loaded = await loadStreamPromise + + expect(loaded.log.length).toEqual(2) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT0) + }) + + test('Considers multiple tips - first tip better', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const loadStreamPromise = streamLoader.loadStream(stream.id, 1) + + const queryMessage = await queryPublished + + // publish 2 tips back-to-back. The loadStream should consider both tips. + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0]))) + + const loaded = await loadStreamPromise + + expect(loaded.log.length).toEqual(2) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT0) + }) + + test('Considers many tips', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const loadStreamPromise = streamLoader.loadStream(stream.id, 1) + + const queryMessage = await queryPublished + + // publish multiple tips back-to-back. The loadStream should consider them all and pick the + // best state + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[2]))) + + const loaded = await loadStreamPromise + + expect(loaded.log.length).toEqual(5) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT2) + }) + + test('Considers multiple tips - first tip invalid', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const loadStreamPromise = streamLoader.loadStream(stream.id, 1) + + const queryMessage = await queryPublished + + const invalidTip = TestUtils.randomCID() + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + + const loaded = await loadStreamPromise + + expect(loaded.log.length).toEqual(2) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT0) + }) + }) + + describe('syncStream', () => { + test('Invalid tip', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const syncStreamPromise = streamLoader.syncStream(states[1], 1) + + const queryMessage = await queryPublished + + const invalidTip = TestUtils.randomCID() + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip))) + + const loaded = await syncStreamPromise + + // If no valid tip is received, state should be unmodified + expect(loaded).toEqual(states[1]) + expect(loaded.log.length).toEqual(2) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT0) + }) + + test('Considers multiple tips - second tip better', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const syncStreamPromise = streamLoader.syncStream(states[0], 1) + + const queryMessage = await queryPublished + + // publish 2 tips back-to-back. The loadStream should consider both tips + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3]))) + + const loaded = await syncStreamPromise + + expect(loaded).toEqual(states[3]) + expect(loaded.log.length).toEqual(4) + expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + expect(loaded.content).toEqual(CONTENT0) + expect(loaded.next.content).toEqual(CONTENT2) + }) + + test('Considers multiple tips - first tip better', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const syncStreamPromise = streamLoader.syncStream(states[0], 1) + + const queryMessage = await queryPublished + + // publish 2 tips back-to-back. The loadStream should consider both tips + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + + const loaded = await syncStreamPromise + + expect(loaded).toEqual(states[3]) + expect(loaded.log.length).toEqual(4) + expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + expect(loaded.content).toEqual(CONTENT0) + expect(loaded.next.content).toEqual(CONTENT2) + }) + + test('Considers many tips', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const syncStreamPromise = streamLoader.syncStream(states[1], 1) + + const queryMessage = await queryPublished + + // publish multiple tips back-to-back. The loadStream should consider them all and pick the + // best state + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4]))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[2]))) + + const loaded = await syncStreamPromise + + expect(loaded).toEqual(states[4]) + expect(loaded.log.length).toEqual(5) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT2) + }) + + test('Considers multiple tips - first tip invalid', async () => { + const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish) + + const syncStreamPromise = streamLoader.syncStream(states[1], 1) + + const queryMessage = await queryPublished + + const invalidTip = TestUtils.randomCID() + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip))) + await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4]))) + + const loaded = await syncStreamPromise + + expect(loaded).toEqual(states[4]) + expect(loaded.log.length).toEqual(5) + expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(loaded.content).toEqual(CONTENT2) + }) + }) +}) diff --git a/packages/core/src/stream-loading/__tests__/tip-fetcher.test.ts b/packages/core/src/stream-loading/__tests__/tip-fetcher.test.ts index 3b74d0f475..9755752af3 100644 --- a/packages/core/src/stream-loading/__tests__/tip-fetcher.test.ts +++ b/packages/core/src/stream-loading/__tests__/tip-fetcher.test.ts @@ -6,6 +6,7 @@ import { IpfsApi, TestUtils } from '@ceramicnetwork/common' import { deserialize, MsgType, PubsubMessage, serialize } from '../../pubsub/pubsub-message.js' import type { SignedMessage } from '@libp2p/interface-pubsub' import { TipFetcher } from '../tip-fetcher.js' +import { lastValueFrom } from 'rxjs' const TOPIC = '/ceramic/test12345' @@ -49,7 +50,7 @@ describe('TipFetcher test', () => { const tipFetcher = new TipFetcher(dispatcher.messageBus) - const foundTip = await tipFetcher.findTip(streamID, 5) + const foundTip = await lastValueFrom(tipFetcher.findPossibleTips(streamID, 5)) expect(foundTip.toString()).toEqual(tip.toString()) await ipfs2.stop() @@ -60,7 +61,9 @@ describe('TipFetcher test', () => { const tipFetcher = new TipFetcher(dispatcher.messageBus) - const foundTip = await tipFetcher.findTip(streamID, 1) + const foundTip = await lastValueFrom(tipFetcher.findPossibleTips(streamID, 1), { + defaultValue: null, + }) expect(foundTip).toEqual(null) }) }) diff --git a/packages/core/src/stream-loading/stream-loader.ts b/packages/core/src/stream-loading/stream-loader.ts index 56d28de7be..7d454630d8 100644 --- a/packages/core/src/stream-loading/stream-loader.ts +++ b/packages/core/src/stream-loading/stream-loader.ts @@ -5,6 +5,8 @@ import { AnchorTimestampExtractor } from './anchor-timestamp-extractor.js' import { DiagnosticsLogger, StreamState, StreamUtils } from '@ceramicnetwork/common' import { CommitID, StreamID } from '@ceramicnetwork/streamid' import { applyTipToState } from './apply-tip-helper.js' +import { concatMap, lastValueFrom } from 'rxjs' +import { CID } from 'multiformats/cid' /** * Class to contain all the logic for loading a stream, including fetching the relevant commit @@ -21,20 +23,64 @@ export class StreamLoader { private readonly stateManipulator: StateManipulator ) {} + async _loadStateFromTip(streamID: StreamID, tip: CID): Promise { + let logWithoutTimestamps + try { + logWithoutTimestamps = await this.logSyncer.syncFullLog(streamID, tip) + } catch (err) { + this.logger.warn( + `Error while syncing log for tip ${tip} received from pubsub, for StreamID ${streamID}: ${err}` + ) + + return null + } + const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps( + logWithoutTimestamps + ) + return this.stateManipulator.applyFullLog(streamID.type, logWithTimestamps, { + throwOnInvalidCommit: false, + }) + } + /** * Completely loads the current state of a Stream from the p2p network just from the StreamID. * @param streamID * @param syncTimeoutSecs */ async loadStream(streamID: StreamID, syncTimeoutSecs: number): Promise { - const tip = await this.tipFetcher.findTip(streamID, syncTimeoutSecs) - const logWithoutTimestamps = await this.logSyncer.syncFullLog(streamID, tip) - const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps( - logWithoutTimestamps + const tipSource$ = await this.tipFetcher.findPossibleTips(streamID, syncTimeoutSecs) + let state + state = await lastValueFrom( + tipSource$.pipe( + concatMap(async (tip) => { + if (!state) { + // this is the first tip response, generate a new StreamState by syncing the tip. + state = await this._loadStateFromTip(streamID, tip) + return state + } else { + // This is not the first tip response we've seen, so instead of generating a completely + // new StreamState, we apply the received tip to the state we already have. + state = await applyTipToState( + this.logSyncer, + this.anchorTimestampExtractor, + this.stateManipulator, + state, + tip, + { throwOnInvalidCommit: false, throwOnConflict: false, throwIfStale: false } + ) + return state + } + }) + ), + { defaultValue: null } ) - return this.stateManipulator.applyFullLog(streamID.type, logWithTimestamps, { - throwOnInvalidCommit: false, - }) + + if (state) { + return state + } + + // We got no valid tip response, so return the genesis state. + return this._loadStateFromTip(streamID, streamID.cid) } /** @@ -45,26 +91,36 @@ export class StreamLoader { */ async syncStream(state: StreamState, syncTimeoutSecs: number): Promise { const streamID = StreamUtils.streamIdFromState(state) - const tip = await this.tipFetcher.findTip(streamID, syncTimeoutSecs) + const tipSource$ = await this.tipFetcher.findPossibleTips(streamID, syncTimeoutSecs) - return applyTipToState( - this.logSyncer, - this.anchorTimestampExtractor, - this.stateManipulator, - state, - tip, - { - throwOnInvalidCommit: false, - throwIfStale: false, - throwOnConflict: false, - } + return lastValueFrom( + tipSource$.pipe( + concatMap(async (tip) => { + try { + state = await applyTipToState( + this.logSyncer, + this.anchorTimestampExtractor, + this.stateManipulator, + state, + tip, + { throwOnInvalidCommit: false, throwOnConflict: false, throwIfStale: false } + ) + } catch (err) { + this.logger.warn( + `Error while applying tip ${tip} received from pubsub, to StreamID ${streamID}: ${err}` + ) + } + return state + }) + ), + { defaultValue: state } ) } /** * Given the currently known about StreamState for a Stream, return the state of that stream * at a specific CommitID. - * @param state + * @param initialState * @param commitId */ async stateAtCommit(initialState: StreamState, commitId: CommitID): Promise { diff --git a/packages/core/src/stream-loading/tip-fetcher.ts b/packages/core/src/stream-loading/tip-fetcher.ts index 8202f460eb..769b88a071 100644 --- a/packages/core/src/stream-loading/tip-fetcher.ts +++ b/packages/core/src/stream-loading/tip-fetcher.ts @@ -1,6 +1,6 @@ import { StreamID } from '@ceramicnetwork/streamid' import { CID } from 'multiformats/cid' -import { lastValueFrom, Observable, timer, takeUntil } from 'rxjs' +import { Observable, timer, takeUntil } from 'rxjs' interface IPFSPubsubQuerier { queryNetwork(streamId: StreamID): Observable @@ -15,18 +15,16 @@ export class TipFetcher { constructor(private readonly pubsubQuerier: IPFSPubsubQuerier) {} /** - * Queries pubsub for the current tip for the given StreamID. If no response messages come - * back within 'syncTimeoutSecs', returns null. Note that there's no guarantee that the CID - * that comes back from this is *actually* a valid tip for this stream, that validation needs to - * happen later. + * Queries pubsub for the current tip for the given StreamID. Returns an Observable that emits + * all tip responses until `syncTimeoutSeconds` seconds pass. + * Note that there's no guarantee that the CIDs emitted from this are *actually* valid tips for + * this stream, that validation needs to happen later. * @param streamID * @param syncTimeoutSecs */ - async findTip(streamID: StreamID, syncTimeoutSecs: number): Promise { + findPossibleTips(streamID: StreamID, syncTimeoutSecs: number): Observable { const tipSource$ = this.pubsubQuerier.queryNetwork(streamID) const timeoutMillis = syncTimeoutSecs * 1000 - return lastValueFrom(tipSource$.pipe(takeUntil(timer(timeoutMillis))), { - defaultValue: null, - }) + return tipSource$.pipe(takeUntil(timer(timeoutMillis))) } }