Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support byte range for raw block requests #101

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 90 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@ipld/dag-json": "^10.1.7",
"@ipld/dag-pb": "^4.0.8",
"@web3-storage/content-claims": "^4.0.5",
"@web3-storage/gateway-lib": "^4.1.1",
"@web3-storage/gateway-lib": "^5.0.0",
"cardex": "^3.0.0",
"dagula": "^7.3.0",
"http-range-parse": "^1.0.0",
Expand All @@ -54,6 +54,7 @@
"@cloudflare/workers-types": "^4.20231218.0",
"@ucanto/principal": "^8.1.0",
"ava": "^5.3.1",
"byteranges": "^1.1.0",
"carbites": "^1.0.6",
"carstream": "^2.1.0",
"dotenv": "^16.3.1",
Expand Down
57 changes: 38 additions & 19 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
withCorsHeaders,
withContentDispositionHeader,
withErrorHandler,
withHttpGet,
createWithHttpMethod,
withCdnCache,
withParsedIpfsUrl,
withFixedLengthStream,
Expand All @@ -19,13 +19,15 @@ import {
withContentClaimsDagula,
withHttpRangeUnsupported,
withVersionHeader,
withCarHandler
withCarBlockHandler
} from './middleware.js'

/**
* @typedef {import('./bindings.js').Environment} Environment
* @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} IpfsUrlContext
* @typedef {import('@web3-storage/gateway-lib').DagulaContext} DagulaContext
* @typedef {import('@web3-storage/gateway-lib').BlockContext} BlockContext
* @typedef {import('@web3-storage/gateway-lib').DagContext} DagContext
* @typedef {import('@web3-storage/gateway-lib').UnixfsContext} UnixfsContext
*/

export default {
Expand All @@ -37,30 +39,47 @@ export default {
withContext,
withCorsHeaders,
withVersionHeader,
withContentDispositionHeader,
withErrorHandler,
withParsedIpfsUrl,
withCarHandler,
withHttpRangeUnsupported,
withHttpGet,
createWithHttpMethod('GET', 'HEAD'),
withCarBlockHandler,
withContentClaimsDagula,
withFormatRawHandler,
withHttpRangeUnsupported,
withFormatCarHandler,
withContentDispositionHeader,
withFixedLengthStream
)
return middleware(handler)(request, env, ctx)
return middleware(handleUnixfs)(request, env, ctx)
}
}

/** @type {import('@web3-storage/gateway-lib').Handler<DagulaContext & IpfsUrlContext, Environment>} */
async function handler (request, env, ctx) {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')

if (searchParams.get('format') === 'raw' || headers.get('Accept')?.includes('application/vnd.ipld.raw')) {
return await handleBlock(request, env, ctx)
/**
* @type {import('@web3-storage/gateway-lib').Middleware<BlockContext & UnixfsContext & IpfsUrlContext, BlockContext & UnixfsContext & IpfsUrlContext, Environment>}
*/
export function withFormatRawHandler (handler) {
return async (request, env, ctx) => {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why require search params ? Code comment answering this would be helpful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so this middleware requires a searchParams: URLSearchParams on the context that some other middleware has extracted from the URL prior to it running.

if (searchParams.get('format') === 'raw' || headers.get('Accept')?.includes('application/vnd.ipld.raw')) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why default content type of application/octet-stream would be refused here ? I would personally expect that specifying raw codec in the CID should be enough signal not to require anything in addition

Copy link
Member Author

@alanshaw alanshaw May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This middleware is designed to only handle the request if ?format=raw is in the query string or Accept: application/vnd.ipld.raw is in the HTTP headers.

I will make this handler also handle requests when the CID codec is raw when I can, but it requires some other changes to keep parity with what the unixfs handler does when a raw block is requested (mainly content type sniffing).

return await handleBlock(request, env, ctx)
}
return handler(request, env, ctx) // pass to other handlers
}
if (searchParams.get('format') === 'car' || headers.get('Accept')?.includes('application/vnd.ipld.car')) {
return await handleCar(request, env, ctx)
}

/**
* @type {import('@web3-storage/gateway-lib').Middleware<DagContext & IpfsUrlContext, DagContext & IpfsUrlContext, Environment>}
*/
export function withFormatCarHandler (handler) {
return async (request, env, ctx) => {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')
if (searchParams.get('format') === 'car' || headers.get('Accept')?.includes('application/vnd.ipld.car')) {
return await handleCar(request, env, ctx)
}
return handler(request, env, ctx) // pass to other handlers
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this single handler split into 2 (well, 3 really) to allow the ?format=raw handler to run before the middleware that rejects requests that have HTTP Range headers.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add this as a code comment. I'm also not sure I fully understand what you mean just yet

}
return await handleUnixfs(request, env, ctx)
}
45 changes: 45 additions & 0 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
import { base58btc } from 'multiformats/bases/base58'
import defer from 'p-defer'
import { OrderedCarBlockBatcher } from './block-batch.js'
import * as IndexEntry from './dag-index/entry.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('dagula').Blockstore} Blockstore
* @typedef {import('dagula').Block} Block
* @typedef {import('@cloudflare/workers-types').R2Bucket} R2Bucket
*/
Expand All @@ -16,6 +18,8 @@ const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61
* A blockstore that is backed by an R2 bucket which contains CARv2
* MultihashIndexSorted indexes alongside CAR files. It can read DAGs split
* across multiple CARs.
*
* @implements {Blockstore}
*/
export class R2Blockstore {
/**
Expand All @@ -32,6 +36,15 @@ export class R2Blockstore {
// console.log(`get ${cid}`)
const entry = await this._idx.get(cid)
if (!entry) return

if (IndexEntry.isLocated(entry)) {
const keyAndOptions = IndexEntry.toBucketGet(entry)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return res ? { cid, bytes: new Uint8Array(await res.arrayBuffer()) } : undefined
}

const carPath = `${entry.origin}/${entry.origin}.car`
const range = { offset: entry.offset }
const res = await this._dataBucket.get(carPath, { range })
Expand All @@ -51,6 +64,32 @@ export class R2Blockstore {
reader.cancel()
return { cid, bytes }
}

/** @param {UnknownLink} cid */
async stat (cid) {
const entry = await this._idx.get(cid)
if (!entry) return

// stat API exists only for blobs (i.e. location claimed)
if (IndexEntry.isLocated(entry)) {
return { size: entry.site.range.length }
}
}

/**
* @param {UnknownLink} cid
* @param {import('dagula').AbortOptions & import('dagula').RangeOptions} [options]
*/
async stream (cid, options) {
const entry = await this._idx.get(cid)
if (!entry) return

const keyAndOptions = IndexEntry.toBucketGet(entry, options)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return /** @type {ReadableStream<Uint8Array>|undefined} */ (res?.body)
}
}

export class BatchingR2Blockstore extends R2Blockstore {
Expand Down Expand Up @@ -199,6 +238,12 @@ export class BatchingR2Blockstore extends R2Blockstore {
const entry = await this._idx.get(cid)
if (!entry) return

// TODO: batch with multipart gyte range request when we switch to reading
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: batch with multipart gyte range request when we switch to reading
// TODO: batch with multipart byte range request when we switch to reading

// from any URL.
if (IndexEntry.isLocated(entry)) {
return super.get(cid)
}

this.#batcher.add({ carCid: entry.origin, blockCid: cid, offset: entry.offset })

if (!entry.multihash) throw new Error('missing entry multihash')
Expand Down
Loading
Loading