Skip to content

Commit

Permalink
refactor: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed May 15, 2024
1 parent fdaa475 commit 679d792
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 62 deletions.
51 changes: 45 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@ipld/dag-json": "^10.1.7",
"@ipld/dag-pb": "^4.0.8",
"@web3-storage/content-claims": "^4.0.5",
"@web3-storage/gateway-lib": "github:web3-storage/gateway-lib#5a027e05be62f985407b46bce70748f543d302b7",
"@web3-storage/gateway-lib": "^5.0.0",
"cardex": "^3.0.0",
"dagula": "^7.3.0",
"http-range-parse": "^1.0.0",
Expand Down
39 changes: 10 additions & 29 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
import { base58btc } from 'multiformats/bases/base58'
import * as Link from 'multiformats/link'
import defer from 'p-defer'
import { OrderedCarBlockBatcher } from './block-batch.js'
import * as IndexEntry from './dag-index/entry.js'
Expand Down Expand Up @@ -39,17 +38,11 @@ export class R2Blockstore {
if (!entry) return

if (IndexEntry.isLocated(entry)) {
// if host is "w3s.link" then content can be found in CARPARK
const url = entry.site.location.find(l => l.hostname === 'w3s.link')
// TODO: allow fetch from _any_ URL
if (!url) return

const link = Link.parse(url.pathname.split('/')[2])
const digestString = base58btc.encode(link.multihash.bytes)
const key = `${digestString}/${digestString}.blob`
const res = await this._dataBucket.get(key, { range: entry.site.range })
if (!res) return
return { cid, bytes: new Uint8Array(await res.arrayBuffer()) }
const keyAndOptions = IndexEntry.toBucketGet(entry)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return res ? { cid, bytes: new Uint8Array(await res.arrayBuffer()) } : undefined
}

const carPath = `${entry.origin}/${entry.origin}.car`
Expand Down Expand Up @@ -91,23 +84,11 @@ export class R2Blockstore {
const entry = await this._idx.get(cid)
if (!entry) return

// stream API exists only for blobs (i.e. location claimed)
if (IndexEntry.isLocated(entry)) {
// if host is "w3s.link" then content can be found in CARPARK
const url = entry.site.location.find(l => l.hostname === 'w3s.link')
// TODO: allow fetch from any URL
if (!url) return

const link = Link.parse(url.pathname.split('/')[2])
const digestString = base58btc.encode(link.multihash.bytes)
const key = `${digestString}/${digestString}.blob`
const first = entry.site.range.offset + (options?.range?.[0] ?? 0)
const last = entry.site.range.offset + (options?.range?.[1] ?? (entry.site.range.length - 1))
const range = { offset: first, length: last - first + 1 }
const res = await this._dataBucket.get(key, { range })
if (!res) return
return /** @type {ReadableStream<Uint8Array>} */ (res.body)
}
const keyAndOptions = IndexEntry.toBucketGet(entry, options)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return /** @type {ReadableStream<Uint8Array>|undefined} */ (res?.body)
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/lib/dag-index/entry.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import * as Link from 'multiformats/link'
import { base58btc } from 'multiformats/bases/base58'
import { CAR_CODE } from '../../constants.js'

/**
* An index entry is "located" if a content claim has specified it's location
* i.e. it is of type `LocatedIndexEntry`.
Expand All @@ -6,3 +10,30 @@
* @returns {entry is import('./api.js').LocatedIndexEntry}
*/
export const isLocated = entry => 'site' in entry

/**
* @param {import('./api.js').IndexEntry} entry
* @param {import('dagula').RangeOptions} [options]
* @returns {[key: string, options: import('@cloudflare/workers-types').R2GetOptions]|undefined}
*/
export const toBucketGet = (entry, options) => {
if (!isLocated(entry)) return

// if host is "w3s.link" then content can be found in CARPARK
const url = entry.site.location.find(l => l.hostname === 'w3s.link')
if (!url) return

const link = Link.parse(url.pathname.split('/')[2])

let key
if (link.code === CAR_CODE) {
key = `${link}/${link}.car`
} else {
const digestString = base58btc.encode(link.multihash.bytes)
key = `${digestString}/${digestString}.blob`
}
const first = entry.site.range.offset + (options?.range?.[0] ?? 0)
const last = entry.site.range.offset + (options?.range?.[1] ?? (entry.site.range.length - 1))
const range = { offset: first, length: last - first + 1 }
return [key, { range }]
}
15 changes: 10 additions & 5 deletions test/helpers/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { pack } from 'ipfs-car/pack'
import * as Link from 'multiformats/link'
import { sha256 } from 'multiformats/hashes/sha2'
import { base58btc } from 'multiformats/bases/base58'
import * as raw from 'multiformats/codecs/raw'
import { CarIndexer } from '@ipld/car'
import { concat } from 'uint8arrays'
import { TreewalkCarSplitter } from 'carbites/treewalk'
Expand All @@ -13,7 +14,7 @@ import * as CAR from './car.js'

/**
* @typedef {import('multiformats').ToString<import('multiformats').MultihashDigest, 'z'>} MultihashString
* @typedef {import('multiformats').ToString<import('cardex/api.js').CARLink>} ShardCID
* @typedef {import('multiformats').ToString<import('cardex/api').CARLink>} ShardCID
* @typedef {number} Offset
*/

Expand Down Expand Up @@ -107,32 +108,36 @@ export class Builder {

/** @type {import('cardex/api').CARLink[]} */
const carCids = []
/** @type {import('multiformats').Link<Uint8Array, typeof raw.code>[]} */
const blobCids = []
/** @type {Array<{ cid: import('multiformats').Link, carCid: import('cardex/api').CARLink }>} */
const indexes = []
const splitter = await TreewalkCarSplitter.fromIterable(out, TARGET_SHARD_SIZE)

for await (const car of splitter.cars()) {
const carBytes = concat(await collect(car))
const carCid = Link.create(CAR.code, await sha256.digest(carBytes))

if (options.asBlob) {
this.#carpark.put(toBlobKey(carCid.multihash), carBytes)
const blobCid = Link.create(raw.code, await sha256.digest(carBytes))
this.#carpark.put(toBlobKey(blobCid.multihash), carBytes)
blobCids.push(blobCid)
} else {
const carCid = Link.create(CAR.code, await sha256.digest(carBytes))
this.#carpark.put(toCarKey(carCid), carBytes)
if (options.satnav ?? true) {
await this.#writeIndex(carCid, carBytes)
}
indexes.push(await this.#writeIndexCar(carBytes))
carCids.push(carCid)
}
carCids.push(carCid)
}

if (!options.asBlob && (options.dudewhere ?? true)) {
// @ts-ignore old multiformats in ipfs-car
await this.#writeLinks(dataCid, carCids)
}

return { dataCid, carCids, indexes }
return { dataCid, carCids, indexes, blobCids }
}

/**
Expand Down
8 changes: 4 additions & 4 deletions test/helpers/content-claims.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ export const generateClaims = async (signer, dataCid, carCid, carStream, indexCi

/**
* @param {import('@ucanto/interface').Signer} signer
* @param {import('cardex/api').CARLink} carCid
* @param {import('multiformats').Link} shard
* @param {ReadableStream<Uint8Array>} carStream CAR file data
*/
export const generateBlockLocationClaims = async (signer, carCid, carStream) => {
export const generateBlockLocationClaims = async (signer, shard, carStream) => {
/** @type {Claims} */
const claims = new LinkMap()

Expand All @@ -131,8 +131,8 @@ export const generateBlockLocationClaims = async (signer, carCid, carStream) =>
.pipeTo(new WritableStream({
async write ({ cid, blockOffset, blockLength }) {
const blocks = claims.get(cid) ?? []
const location = new URL(`https://w3s.link/ipfs/${carCid}?format=raw`)
blocks.push(await generateLocationClaim(signer, carCid, location, blockOffset, blockLength))
const location = new URL(`https://w3s.link/ipfs/${shard}?format=raw`)
blocks.push(await generateLocationClaim(signer, shard, location, blockOffset, blockLength))
claims.set(cid, blocks)
}
}))
Expand Down
34 changes: 17 additions & 17 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ describe('freeway', () => {
it('should use location content claim', async () => {
const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }]
// no dudewhere or satnav so only content claims can satisfy the request
const { dataCid, carCids } = await builder.add(input, { asBlob: true })
const { dataCid, blobCids } = await builder.add(input, { asBlob: true })

const carpark = await miniflare.getR2Bucket('CARPARK')
const res = await carpark.get(toBlobKey(carCids[0].multihash))
const res = await carpark.get(toBlobKey(blobCids[0].multihash))
assert(res)

// @ts-expect-error nodejs ReadableStream does not implement ReadableStream interface correctly
const claims = await generateBlockLocationClaims(claimsService.signer, carCids[0], res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, blobCids[0], res.body)
claimsService.setClaims(claims)

const res1 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${dataCid}/${input[0].path}`)
Expand Down Expand Up @@ -400,24 +400,24 @@ describe('freeway', () => {
it('should GET a byte range of raw block', async () => {
const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }]
// no dudewhere or satnav so only content claims can satisfy the request
const { carCids } = await builder.add(input, { asBlob: true })
const { blobCids } = await builder.add(input, { asBlob: true })

const carpark = await miniflare.getR2Bucket('CARPARK')
const res0 = await carpark.get(toBlobKey(carCids[0].multihash))
const res0 = await carpark.get(toBlobKey(blobCids[0].multihash))
assert(res0)

const url = new URL(`https://w3s.link/ipfs/${carCids[0]}?format=raw`)
const claim = await generateLocationClaim(claimsService.signer, carCids[0], url, 0, input[0].content.length)
claimsService.setClaims(new LinkMap([[carCids[0], [claim]]]))
const url = new URL(`https://w3s.link/ipfs/${blobCids[0]}?format=raw`)
const claim = await generateLocationClaim(claimsService.signer, blobCids[0], url, 0, input[0].content.length)
claimsService.setClaims(new LinkMap([[blobCids[0], [claim]]]))

const res1 = await carpark.get(toBlobKey(carCids[0].multihash))
const res1 = await carpark.get(toBlobKey(blobCids[0].multihash))
assert(res1)

await /** @type {ReadableStream} */ (res1.body)
.pipeThrough(new CARReaderStream())
.pipeTo(new WritableStream({
async write ({ bytes, blockOffset, blockLength }) {
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${carCids[0]}?format=raw`, {
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${blobCids[0]}?format=raw`, {
headers: {
Range: `bytes=${blockOffset}-${blockOffset + blockLength - 1}`
}
Expand All @@ -430,22 +430,22 @@ describe('freeway', () => {
assert.equal(contentLength, bytes.length)
assert.equal(res.headers.get('Content-Range'), `bytes ${blockOffset}-${blockOffset + blockLength - 1}/${input[0].content.length}`)
assert.equal(res.headers.get('Content-Type'), 'application/vnd.ipld.raw')
assert.equal(res.headers.get('Etag'), `"${carCids[0]}.raw"`)
assert.equal(res.headers.get('Etag'), `"${blobCids[0]}.raw"`)
}
}))
})

it('should GET a multipart byte range of raw block', async () => {
const input = [{ path: 'sargo.tar.xz', content: randomBytes(MAX_CAR_BYTES_IN_MEMORY + 1) }]
// no dudewhere or satnav so only content claims can satisfy the request
const { carCids } = await builder.add(input, { asBlob: true })
const { blobCids } = await builder.add(input, { asBlob: true })

const url = new URL(`https://w3s.link/ipfs/${carCids[0]}?format=raw`)
const claim = await generateLocationClaim(claimsService.signer, carCids[0], url, 0, input[0].content.length)
claimsService.setClaims(new LinkMap([[carCids[0], [claim]]]))
const url = new URL(`https://w3s.link/ipfs/${blobCids[0]}?format=raw`)
const claim = await generateLocationClaim(claimsService.signer, blobCids[0], url, 0, input[0].content.length)
claimsService.setClaims(new LinkMap([[blobCids[0], [claim]]]))

const carpark = await miniflare.getR2Bucket('CARPARK')
const res0 = await carpark.get(toBlobKey(carCids[0].multihash))
const res0 = await carpark.get(toBlobKey(blobCids[0].multihash))
assert(res0)

/** @type {Array<import('carstream/api').Block & import('carstream/api').Position>} */
Expand All @@ -454,7 +454,7 @@ describe('freeway', () => {
.pipeThrough(new CARReaderStream())
.pipeTo(new WritableStream({ write (block) { blocks.push(block) } }))

const res1 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${carCids[0]}?format=raw`, {
const res1 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${blobCids[0]}?format=raw`, {
headers: {
Range: `bytes=${blocks.map(b => `${b.blockOffset}-${b.blockOffset + b.blockLength - 1}`).join(',')}`
}
Expand Down

0 comments on commit 679d792

Please sign in to comment.