Skip to content

Commit

Permalink
chore: Cache more of IPLD records (#2890)
Browse files Browse the repository at this point in the history
* chore: types

* chore: Use dispatcher cache
  • Loading branch information
ukstv authored Aug 11, 2023
1 parent f94fb50 commit cb82115
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
12 changes: 5 additions & 7 deletions packages/core/src/__tests__/dispatcher-mock-ipfs.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, jest } from '@jest/globals'
import { expect, jest, it, describe, beforeEach, afterEach } from '@jest/globals'
import { Dispatcher } from '../dispatcher.js'
import { CID } from 'multiformats/cid'
import { StreamID } from '@ceramicnetwork/streamid'
Expand Down Expand Up @@ -36,7 +36,7 @@ const mock_ipfs = {
dag: {
put: jest.fn(),
get: jest.fn(),
resolve: jest.fn(async (cid: CID, opts?: any) => {
resolve: jest.fn(async (cid: CID) => {
return { cid: cid }
}),
import: jest.fn(() => {
Expand All @@ -50,7 +50,7 @@ const mock_ipfs = {
id: async () => ({ id: 'ipfsid' }),
codecs: {
listCodecs: () => [dagJoseCodec, dagCborCodec],
getCodec: (codename) =>
getCodec: (codename: string | number) =>
[dagJoseCodec, dagCborCodec].find(
(codec) => codec.code === codename || codec.name === codename
),
Expand All @@ -76,7 +76,7 @@ describe('Dispatcher with mock ipfs', () => {
const levelPath = await tmp.tmpName()
const levelStore = new LevelDbStore(levelPath, 'test')
const stateStore = new StreamStateStore(loggerProvider.getDiagnosticsLogger())
stateStore.open(levelStore)
await stateStore.open(levelStore)
repository = new Repository(100, 100, loggerProvider.getDiagnosticsLogger())
const pinStore = {
stateStore,
Expand Down Expand Up @@ -178,9 +178,7 @@ describe('Dispatcher with mock ipfs', () => {
})

const blockGetSpy = ipfs.block.get
blockGetSpy.mockImplementation(async function (cid: CID, opts: any) {
return carFile.blocks.get(cid).payload
})
blockGetSpy.mockImplementation(async (cid: CID) => carFile.blocks.get(cid).payload)

expect(await dispatcher.retrieveFromIPFS(FAKE_CID, '/foo')).toEqual('foo')
// CID+path not found in cache so IPFS lookup performed and cache updated
Expand Down
24 changes: 16 additions & 8 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ export class Dispatcher {
this.messageBus.subscribe(this.handleMessage.bind(this))
this.dagNodeCache = new lru.LRUMap<string, any>(IPFS_CACHE_SIZE)
this.carFactory = new CARFactory()
this._ipfs.codecs.listCodecs().forEach((codec) => {
for (const codec of this._ipfs.codecs.listCodecs()) {
this.carFactory.codecs.add(codec)
})
}
}

async ipfsNodeStatus(): Promise<IpfsNodeStatus> {
Expand All @@ -156,9 +156,12 @@ export class Dispatcher {
}

async storeRecord(record: Record<string, unknown>): Promise<CID> {
return await this._shutdownSignal.abortable((signal) => {
return this._ipfs.dag.put(record, { signal: signal })
})
return this._shutdownSignal
.abortable((signal) => this._ipfs.dag.put(record, { signal: signal }))
.then((cid) => {
this.dagNodeCache.set(cid.toString(), record)
return cid
})
}

async getIpfsBlock(cid: CID): Promise<Uint8Array> {
Expand All @@ -171,8 +174,8 @@ export class Dispatcher {
* Stores all the blocks in the given CAR file into the local IPFS node.
* @param car
*/
async importCAR(car: CAR): Promise<void> {
return await this._shutdownSignal.abortable(async (signal) => {
importCAR(car: CAR): Promise<void> {
return this._shutdownSignal.abortable(async (signal) => {
await all(this._ipfs.dag.import(car, { signal, pinRoots: false }))
})
}
Expand All @@ -194,18 +197,23 @@ export class Dispatcher {
const capCID = CID.parse(decodedProtectedHeader.cap.replace('ipfs://', ''))
carFile.blocks.put(new CarBlock(capCID, cacaoBlock))
restrictBlockSize(cacaoBlock, capCID)
this.dagNodeCache.set(capCID.toString(), carFile.get(capCID))
}

carFile.blocks.put(new CarBlock(jws.link, linkedBlock)) // Encode payload
const payloadCID = jws.link
carFile.blocks.put(new CarBlock(payloadCID, linkedBlock)) // Encode payload
restrictBlockSize(linkedBlock, jws.link)
this.dagNodeCache.set(payloadCID.toString(), carFile.get(payloadCID))
const cid = carFile.put(jws, { codec: 'dag-jose', hasher: 'sha2-256', isRoot: true }) // Encode JWS itself
restrictBlockSize(carFile.blocks.get(cid).payload, cid)
this.dagNodeCache.set(cid.toString(), carFile.get(cid))
await this.importCAR(carFile)
Metrics.count(COMMITS_STORED, 1)
return cid
}
const cid = carFile.put(data, { isRoot: true })
restrictBlockSize(carFile.blocks.get(cid).payload, cid)
this.dagNodeCache.set(cid.toString(), carFile.get(cid))
await this.importCAR(carFile)
Metrics.count(COMMITS_STORED, 1)
return cid
Expand Down
10 changes: 3 additions & 7 deletions packages/core/src/state-management/running-state.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { Subscription } from 'rxjs'
import {
StreamState,
RunningStateLike,
StreamStateSubject,
SubscriptionSet,
} from '@ceramicnetwork/common'
import { StreamStateSubject, SubscriptionSet } from '@ceramicnetwork/common'
import { StreamID } from '@ceramicnetwork/streamid'
import type { Subscription } from 'rxjs'
import type { StreamState, RunningStateLike } from '@ceramicnetwork/common'
import type { CID } from 'multiformats/cid'

export class RunningState extends StreamStateSubject implements RunningStateLike {
Expand Down

0 comments on commit cb82115

Please sign in to comment.