Skip to content

Commit

Permalink
feat: Make StreamLoader consider multiple tip responses when syncing …
Browse files Browse the repository at this point in the history
…a stream (#2962)
  • Loading branch information
stbrody authored Sep 20, 2023
1 parent 535172e commit 07019aa
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 34 deletions.
336 changes: 333 additions & 3 deletions packages/core/src/stream-loading/__tests__/stream-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ import { StreamLoader } from '../stream-loader.js'
import { TipFetcher } from '../tip-fetcher.js'
import { AnchorTimestampExtractor } from '../anchor-timestamp-extractor.js'
import { InMemoryAnchorService } from '../../anchor/memory/in-memory-anchor-service.js'
import { CommitID } from '@ceramicnetwork/streamid'
import { CommitID, StreamID } from '@ceramicnetwork/streamid'
import cloneDeep from 'lodash.clonedeep'
import { CID } from 'multiformats/cid'
import {
MsgType,
PubsubMessage,
QueryMessage,
ResponseMessage,
} from '../../pubsub/pubsub-message.js'
import { asIpfsMessage } from '../../pubsub/__tests__/as-ipfs-message.js'

const TOPIC = '/ceramic/test12345'
const CONTENT0 = { step: 0 }
Expand All @@ -42,7 +50,7 @@ function expectStatesEqualWithPendingAnchor(
)
}

describe('StreamLoader test', () => {
describe('StreamLoader querying against real Ceramic node', () => {
jest.setTimeout(1000 * 30)

let dispatcher: Dispatcher
Expand All @@ -59,7 +67,8 @@ describe('StreamLoader test', () => {
dispatcherIpfs = await createIPFS()
dispatcher = await createDispatcher(dispatcherIpfs, TOPIC)
// speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs.
dispatcher._ipfsTimeout = 1000
dispatcher._ipfsTimeout = 100
await dispatcher.init()

const logger = new LoggerProvider().getDiagnosticsLogger()
const tipFetcher = new TipFetcher(dispatcher.messageBus)
Expand Down Expand Up @@ -248,3 +257,324 @@ describe('StreamLoader test', () => {
})
})
})

/**
* Intercepts the function that publishes messages to pubsub and returns the first query message
* published (while still passing it on to pubsub as normal).
*/
function getQueryPublishedPromise(
dispatcher: Dispatcher,
streamID: StreamID,
originalPubsubPublish
): Promise<QueryMessage> {
const pubsubPublishSpy = jest.spyOn(dispatcher.messageBus.pubsub, 'next')
return new Promise((resolve) => {
pubsubPublishSpy.mockImplementation((message: PubsubMessage) => {
if (message.typ == MsgType.QUERY && message.stream.equals(streamID)) {
resolve(message)
}
return originalPubsubPublish(message)
})
})
}

function makeResponse(streamID: StreamID, queryId: string, cid: CID): ResponseMessage {
const tipMap = new Map().set(streamID.toString(), cid)
const response = { typ: MsgType.RESPONSE as const, id: queryId, tips: tipMap }
return response
}

describe('StreamLoader querying against mocked pubsub responses', () => {
jest.setTimeout(1000 * 30)

let ipfs: IpfsApi

let dispatcher: Dispatcher
let streamLoader: StreamLoader

let stream: TileDocument
let commitCids: Array<CID>
const states: Array<StreamState> = []

let receiveMessage
let originalPubsubPublish

beforeAll(async () => {
ipfs = await createIPFS()
const ceramic = await createCeramic(ipfs, { pubsubTopic: TOPIC })

dispatcher = await createDispatcher(ipfs, TOPIC)
// speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs.
dispatcher._ipfsTimeout = 100

// Intercept the function passed to ipfs on pubsub subscription so that we can publish new
// messages directly
const originalPubsubSubscribe = ipfs.pubsub.subscribe.bind(ipfs.pubsub)
const pubsubSubscribeSpy = jest.spyOn(ipfs.pubsub, 'subscribe')
pubsubSubscribeSpy.mockImplementation(async (topic, onMessageCallback, opts) => {
receiveMessage = onMessageCallback
return originalPubsubSubscribe(topic, onMessageCallback, opts)
})
originalPubsubPublish = dispatcher.messageBus.pubsub.next.bind(dispatcher.messageBus.pubsub)

const logger = new LoggerProvider().getDiagnosticsLogger()
const tipFetcher = new TipFetcher(dispatcher.messageBus)
const logSyncer = new LogSyncer(dispatcher)
const anchorTimestampExtractor = new AnchorTimestampExtractor(
logger,
dispatcher,
ceramic.context.anchorService as InMemoryAnchorService
)
const handlers = new HandlersMap(logger)
const stateManipulator = new StateManipulator(
logger,
handlers,
{ did: ceramic.did, api: ceramic } as Context,
logSyncer
)
streamLoader = new StreamLoader(
logger,
tipFetcher,
logSyncer,
anchorTimestampExtractor,
stateManipulator
)

stream = await TileDocument.create(ceramic, CONTENT0)
states.push(stream.state)
await TestUtils.anchorUpdate(ceramic, stream)
states.push(stream.state)
await stream.update(CONTENT1)
states.push(stream.state)
await stream.update(CONTENT2)
states.push(stream.state)
await TestUtils.anchorUpdate(ceramic, stream)
states.push(stream.state)
commitCids = stream.allCommitIds.map((commitId) => commitId.commit)
expect(commitCids.length).toEqual(5)
// When we load the state from the network we won't ever see a pending anchor status.
states.forEach((state) => {
if (state.anchorStatus == AnchorStatus.PENDING) {
state.anchorStatus = AnchorStatus.NOT_REQUESTED
}
})
expect(states[0].log.length).toEqual(1)
expect(states[1].log.length).toEqual(2)
expect(states[2].log.length).toEqual(3)
expect(states[3].log.length).toEqual(4)
expect(states[4].log.length).toEqual(5)

// Close the Ceramic node before tests start so it won't respond to any pubsub messages.
await ceramic.close()
})

afterAll(async () => {
await dispatcher.close()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
// has been processed
await TestUtils.delay(5000)

await ipfs.stop()
})

describe('loadStream', () => {
test('Invalid tip', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const loadStreamPromise = streamLoader.loadStream(stream.id, 1)

const queryMessage = await queryPublished

const invalidTip = TestUtils.randomCID()
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip)))

const loaded = await loadStreamPromise

// If no valid tip is received, should return genesis state
expect(loaded.log.length).toEqual(1)
expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)
expect(loaded.content).toEqual(CONTENT0)
})

test('Considers multiple tips - second tip better', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const loadStreamPromise = streamLoader.loadStream(stream.id, 1)

const queryMessage = await queryPublished

// publish 2 tips back-to-back. The loadStream should consider both tips
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))

const loaded = await loadStreamPromise

expect(loaded.log.length).toEqual(2)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT0)
})

test('Considers multiple tips - first tip better', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const loadStreamPromise = streamLoader.loadStream(stream.id, 1)

const queryMessage = await queryPublished

// publish 2 tips back-to-back. The loadStream should consider both tips.
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0])))

const loaded = await loadStreamPromise

expect(loaded.log.length).toEqual(2)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT0)
})

test('Considers many tips', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const loadStreamPromise = streamLoader.loadStream(stream.id, 1)

const queryMessage = await queryPublished

// publish multiple tips back-to-back. The loadStream should consider them all and pick the
// best state
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[2])))

const loaded = await loadStreamPromise

expect(loaded.log.length).toEqual(5)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT2)
})

test('Considers multiple tips - first tip invalid', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const loadStreamPromise = streamLoader.loadStream(stream.id, 1)

const queryMessage = await queryPublished

const invalidTip = TestUtils.randomCID()
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip)))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))

const loaded = await loadStreamPromise

expect(loaded.log.length).toEqual(2)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT0)
})
})

describe('syncStream', () => {
test('Invalid tip', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const syncStreamPromise = streamLoader.syncStream(states[1], 1)

const queryMessage = await queryPublished

const invalidTip = TestUtils.randomCID()
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip)))

const loaded = await syncStreamPromise

// If no valid tip is received, state should be unmodified
expect(loaded).toEqual(states[1])
expect(loaded.log.length).toEqual(2)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT0)
})

test('Considers multiple tips - second tip better', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const syncStreamPromise = streamLoader.syncStream(states[0], 1)

const queryMessage = await queryPublished

// publish 2 tips back-to-back. The loadStream should consider both tips
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3])))

const loaded = await syncStreamPromise

expect(loaded).toEqual(states[3])
expect(loaded.log.length).toEqual(4)
expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)
expect(loaded.content).toEqual(CONTENT0)
expect(loaded.next.content).toEqual(CONTENT2)
})

test('Considers multiple tips - first tip better', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const syncStreamPromise = streamLoader.syncStream(states[0], 1)

const queryMessage = await queryPublished

// publish 2 tips back-to-back. The loadStream should consider both tips
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))

const loaded = await syncStreamPromise

expect(loaded).toEqual(states[3])
expect(loaded.log.length).toEqual(4)
expect(loaded.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)
expect(loaded.content).toEqual(CONTENT0)
expect(loaded.next.content).toEqual(CONTENT2)
})

test('Considers many tips', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const syncStreamPromise = streamLoader.syncStream(states[1], 1)

const queryMessage = await queryPublished

// publish multiple tips back-to-back. The loadStream should consider them all and pick the
// best state
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[1])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[3])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[0])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4])))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[2])))

const loaded = await syncStreamPromise

expect(loaded).toEqual(states[4])
expect(loaded.log.length).toEqual(5)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT2)
})

test('Considers multiple tips - first tip invalid', async () => {
const queryPublished = getQueryPublishedPromise(dispatcher, stream.id, originalPubsubPublish)

const syncStreamPromise = streamLoader.syncStream(states[1], 1)

const queryMessage = await queryPublished

const invalidTip = TestUtils.randomCID()
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, invalidTip)))
await receiveMessage(asIpfsMessage(makeResponse(stream.id, queryMessage.id, commitCids[4])))

const loaded = await syncStreamPromise

expect(loaded).toEqual(states[4])
expect(loaded.log.length).toEqual(5)
expect(loaded.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(loaded.content).toEqual(CONTENT2)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { IpfsApi, TestUtils } from '@ceramicnetwork/common'
import { deserialize, MsgType, PubsubMessage, serialize } from '../../pubsub/pubsub-message.js'
import type { SignedMessage } from '@libp2p/interface-pubsub'
import { TipFetcher } from '../tip-fetcher.js'
import { lastValueFrom } from 'rxjs'

const TOPIC = '/ceramic/test12345'

Expand Down Expand Up @@ -49,7 +50,7 @@ describe('TipFetcher test', () => {

const tipFetcher = new TipFetcher(dispatcher.messageBus)

const foundTip = await tipFetcher.findTip(streamID, 5)
const foundTip = await lastValueFrom(tipFetcher.findPossibleTips(streamID, 5))
expect(foundTip.toString()).toEqual(tip.toString())

await ipfs2.stop()
Expand All @@ -60,7 +61,9 @@ describe('TipFetcher test', () => {

const tipFetcher = new TipFetcher(dispatcher.messageBus)

const foundTip = await tipFetcher.findTip(streamID, 1)
const foundTip = await lastValueFrom(tipFetcher.findPossibleTips(streamID, 1), {
defaultValue: null,
})
expect(foundTip).toEqual(null)
})
})
Loading

0 comments on commit 07019aa

Please sign in to comment.