Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support byte range for raw block requests #57

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
994 changes: 460 additions & 534 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@
"author": "Alan Shaw",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@httpland/range-parser": "^1.2.0",
"@ipld/car": "^5.2.6",
"@web3-storage/handlebars": "^1.0.0",
"bytes": "^3.1.2",
"chardet": "^2.0.0",
"dagula": "^7.3.0",
"dagula": "github:web3-storage/dagula#feat/add-block-streaming-interface",
"magic-bytes.js": "^1.8.0",
"mrmime": "^1.0.1",
"multiformats": "^13.0.1",
"multipart-byte-range": "^2.0.1",
"timeout-abort-controller": "^3.0.0",
"uint8arrays": "^5.0.1"
},
Expand Down
14 changes: 11 additions & 3 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CID } from 'multiformats/cid'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import type { IDagula } from 'dagula'
import type { BlockService, DagService, UnixfsService } from 'dagula'
import type { TimeoutController } from 'timeout-abort-controller'

export {}
Expand All @@ -23,8 +23,16 @@ export interface TimeoutControllerContext extends Context {
timeoutController: TimeoutController
}

export interface DagulaContext extends Context {
dagula: IDagula
export interface BlockContext extends Context {
blocks: BlockService
}

export interface DagContext extends Context {
dag: DagService
}

export interface UnixfsContext extends Context {
unixfs: UnixfsService
}

export interface UnixfsEntryContext extends Context {
Expand Down
100 changes: 90 additions & 10 deletions src/handlers/block.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
/* eslint-env browser */
import { MultipartByteRange } from 'multipart-byte-range'
import { decodeRangeHeader, resolveRange } from '../util/range.js'
import { HttpError } from '../util/errors.js'

/**
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').DagulaContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} BlockHandlerContext
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').BlockContext & import('../bindings.js').UnixfsContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} BlockHandlerContext
*/

/** @type {import('../bindings.js').Handler<BlockHandlerContext>} */
export async function handleBlock (request, env, ctx) {
const { dataCid, path, timeoutController: controller, dagula } = ctx
const { dataCid, path, timeoutController: controller, blocks, unixfs, searchParams } = ctx
if (!dataCid) throw new Error('missing IPFS path')
if (path == null) throw new Error('missing URL path')
if (!dagula) throw new Error('missing dagula instance')
if (!blocks) throw new Error('missing block service')
if (!unixfs) throw new Error('missing unixfs service')
if (!searchParams) throw new Error('missing search params')

/** @type {import('multiformats').CID} */
let cid
if (path && path !== '/') {
const entry = await dagula.getUnixfs(`${dataCid}${path}`, { signal: controller?.signal })
// TODO: resolve path before calling handler
const entry = await unixfs.getUnixfs(`${dataCid}${path}`, { signal: controller?.signal })
cid = entry.cid
} else {
cid = dataCid
Expand All @@ -25,23 +31,97 @@ export async function handleBlock (request, env, ctx) {
return new Response(null, { status: 304 })
}

const block = await dagula.getBlock(cid, { signal: controller?.signal })
const { searchParams } = new URL(request.url)
if (request.method === 'HEAD') {
const stat = await blocks.statBlock(cid, { signal: controller?.signal })
return new Response(undefined, {
headers: {
'Accept-Ranges': 'bytes',
'Content-Length': stat.size.toString(),
Etag: etag,
Vary: 'Accept, Range'
}
})
}
if (request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}

/** @type {import('multipart-byte-range').Range[]} */
let ranges = []
if (request.headers.has('range')) {
try {
ranges = decodeRangeHeader(request.headers.get('range') ?? '')
} catch (err) {
throw new HttpError('invalid range', { cause: err, status: 400 })
}
}

const name = searchParams.get('filename') || `${cid}.bin`
const utf8Name = encodeURIComponent(name)
// eslint-disable-next-line no-control-regex
const asciiName = encodeURIComponent(name.replace(/[^\x00-\x7F]/g, '_'))

const headers = {
'Content-Type': 'application/vnd.ipld.raw',
'X-Content-Type-Options': 'nosniff',
Etag: etag,
'Cache-Control': 'public, max-age=29030400, immutable',
'Content-Length': block.bytes.length.toString(),
'Content-Disposition': `attachment; filename="${asciiName}"; filename*=UTF-8''${utf8Name}`,
Vary: 'Accept'
Vary: 'Accept, Range'
}

if (ranges.length > 1) {
return handleMultipartRange(blocks, cid, ranges, { signal: controller?.signal, headers })
} else if (ranges.length === 1) {
return handleRange(blocks, cid, ranges[0], { signal: controller?.signal, headers })
}

// no range is effectively Range: bytes=0-
return handleRange(blocks, cid, [0], { signal: controller?.signal, headers })
}

/**
* @param {import('dagula').BlockService} blocks
* @param {import('multiformats').UnknownLink} cid
* @param {import('dagula').Range} range
* @param {{ signal?: AbortSignal, headers?: Record<string, string> }} [options]
*/
const handleRange = async (blocks, cid, range, options) => {
const stat = await blocks.statBlock(cid, { signal: options?.signal })
const [first, last] = resolveRange(range, stat.size)
const contentLength = last - first + 1

/** @type {Record<string, string>} */
const headers = {
...options?.headers,
'Content-Type': 'application/vnd.ipld.raw',
'Content-Length': String(contentLength)
}

if (stat.size !== contentLength) {
const contentRange = `bytes ${first}-${last}/${stat.size}`
headers['Content-Range'] = contentRange
}

return new Response(block.bytes, { headers })
const status = stat.size === contentLength ? 200 : 206
const content = await blocks.streamBlock(cid, { range, signal: options?.signal })
return new Response(content, { status, headers })
}

/**
* @param {import('dagula').BlockService} blocks
* @param {import('multiformats').UnknownLink} cid
* @param {import('dagula').Range[]} ranges
* @param {{ signal?: AbortSignal, headers?: Record<string, string> }} [options]
*/
const handleMultipartRange = async (blocks, cid, ranges, options) => {
const stat = await blocks.statBlock(cid, { signal: options?.signal })

/** @param {import('dagula').AbsoluteRange} range */
const getBytes = range => blocks.streamBlock(cid, { signal: options?.signal, range })

const source = new MultipartByteRange(ranges, getBytes, {
totalSize: stat.size,
contentType: 'application/vnd.ipld.raw'
})
return new Response(source, { status: 206, headers: { ...options?.headers, ...source.headers } })
}
22 changes: 12 additions & 10 deletions src/handlers/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { toReadableStream } from '../util/streams.js'
import { HttpError } from '../util/errors.js'

/**
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').DagulaContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} CarHandlerContext
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').DagContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} CarHandlerContext
* @typedef {import('multiformats').CID} CID
* @typedef {{ version: 1|2, order: import('dagula').BlockOrder, dups: boolean }} CarParams
*/
Expand All @@ -14,10 +14,14 @@ const DefaultCarParams = { version: 1, order: 'unk', dups: true }

/** @type {import('../bindings.js').Handler<CarHandlerContext>} */
export async function handleCar (request, env, ctx) {
const { dataCid, path, timeoutController: controller, dagula, searchParams } = ctx
const { dataCid, path, timeoutController: controller, dag, searchParams } = ctx
if (!dataCid) throw new Error('missing IPFS path')
if (path == null) throw new Error('missing URL path')
if (!dagula) throw new Error('missing dagula instance')
if (!dag) throw new Error('missing DAG service')

if (request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}

const dagScope = getDagScope(searchParams)
const entityBytes = getEntityBytes(searchParams)
Expand All @@ -40,7 +44,7 @@ export async function handleCar (request, env, ctx) {
const { writer, out } = CarWriter.create(dataCid)
;(async () => {
try {
for await (const block of dagula.getPath(`${dataCid}${path}`, { dagScope, entityBytes, order, signal: controller?.signal })) {
for await (const block of dag.getPath(`${dataCid}${path}`, { dagScope, entityBytes, order, signal: controller?.signal })) {
await writer.put(block)
}
} catch (/** @type {any} */ err) {
Expand Down Expand Up @@ -83,7 +87,7 @@ function getDagScope (searchParams) {

/**
* @param {URLSearchParams} searchParams
* @returns {import('dagula').ByteRange|undefined}
* @returns {import('dagula').Range|undefined}
*/
function getEntityBytes (searchParams) {
const value = searchParams.get('entity-bytes')
Expand All @@ -97,17 +101,15 @@ function getEntityBytes (searchParams) {
if (isNaN(from)) {
throw new HttpError(`invalid entity-bytes: ${value}`, { status: 400 })
}
/** @type {number|'*'} */
/** @type {number|undefined} */
let to
if (parts[1] === '*') {
to = parts[1]
} else {
if (parts[1] !== '*') {
to = parseInt(parts[1])
if (isNaN(to)) {
throw new HttpError(`invalid entity-bytes: ${value}`, { status: 400 })
}
}
return { from, to }
return to ? [from, to] : [from]
}

/**
Expand Down
16 changes: 10 additions & 6 deletions src/handlers/unixfs-dir.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import bytes from 'bytes'
import './templates/bundle.cjs'
import { toReadableStream } from '../util/streams.js'
import { handleUnixfsFile } from './unixfs-file.js'
import { HttpError } from '../util/errors.js'

/**
* @typedef {import('../bindings.js').UnixfsEntryContext & import('../bindings.js').IpfsUrlContext & import('../bindings.js').DagulaContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} UnixfsDirectoryHandlerContext
* @typedef {import('../bindings.js').UnixfsEntryContext & import('../bindings.js').IpfsUrlContext & import('../bindings.js').UnixfsContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} UnixfsDirectoryHandlerContext
*/

/**
Expand Down Expand Up @@ -44,15 +45,15 @@ const knownIcons = Object.fromEntries([

/** @type {import('../bindings.js').Handler<UnixfsDirectoryHandlerContext>} */
export async function handleUnixfsDir (request, env, ctx) {
const { unixfsEntry: entry, timeoutController: controller, dagula, dataCid, path } = ctx
if (!entry) throw new Error('missing unixfs entry')
if (!entry.type.includes('directory')) throw new Error('non unixfs directory entry')
if (!dagula) throw new Error('missing dagula instance')
const { unixfsEntry: entry, timeoutController: controller, unixfs, dataCid, path } = ctx
if (!entry) throw new Error('missing UnixFS entry')
if (!entry.type.includes('directory')) throw new Error('non UnixFS directory entry')
if (!unixfs) throw new Error('missing UnixFS service')

// serve index.html if directory contains one
try {
const indexPath = `${dataCid}${path}${path.endsWith('/') ? '' : '/'}index.html`
const fileEntry = await dagula.getUnixfs(indexPath, { signal: controller?.signal })
const fileEntry = await unixfs.getUnixfs(indexPath, { signal: controller?.signal })
ctx.unixfsEntry = fileEntry
return handleUnixfsFile(request, env, ctx)
} catch (/** @type {any} */ err) {
Expand All @@ -67,6 +68,9 @@ export async function handleUnixfsDir (request, env, ctx) {
if (request.method === 'HEAD') {
return new Response(null, { headers })
}
if (request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}

const isSubdomain = new URL(request.url).hostname.includes('.ipfs.')
/** @param {string} p CID path like "<cid>[/optional/path]" */
Expand Down
12 changes: 10 additions & 2 deletions src/handlers/unixfs-file.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-env browser */
import { toReadableStream } from '../util/streams.js'
import { detectContentType } from '../util/mime.js'
import { HttpError } from '../util/errors.js'

/**
* @typedef {import('../bindings.js').UnixfsEntryContext} UnixfsFileHandlerContext
Expand All @@ -9,9 +10,9 @@ import { detectContentType } from '../util/mime.js'
/** @type {import('../bindings.js').Handler<UnixfsFileHandlerContext>} */
export async function handleUnixfsFile (request, env, ctx) {
const { unixfsEntry: entry } = ctx
if (!entry) throw new Error('missing unixfs entry')
if (!entry) throw new Error('missing UnixFS entry')
if (entry.type !== 'file' && entry.type !== 'raw' && entry.type !== 'identity') {
throw new Error('non unixfs file entry')
throw new Error('non UnixFS file entry')
}

const etag = `"${entry.cid}"`
Expand All @@ -26,6 +27,13 @@ export async function handleUnixfsFile (request, env, ctx) {
'Content-Length': entry.size.toString()
}

if (request.method === 'HEAD') {
return new Response(null, { headers })
}
if (request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}

console.log('unixfs root', entry.cid.toString())
const contentIterator = entry.content()[Symbol.asyncIterator]()
const { done, value: firstChunk } = await contentIterator.next()
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import { handleUnixfsFile } from './unixfs-file.js'
import { HttpError } from '../util/errors.js'

/**
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').DagulaContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} UnixfsHandlerContext
* @typedef {import('../bindings.js').IpfsUrlContext & import('../bindings.js').UnixfsContext & { timeoutController?: import('../bindings.js').TimeoutControllerContext['timeoutController'] }} UnixfsHandlerContext
*/

/** @type {import('../bindings.js').Handler<UnixfsHandlerContext>} */
export async function handleUnixfs (request, env, ctx) {
const { dataCid, path, timeoutController: controller, dagula } = ctx
const { dataCid, path, timeoutController: controller, unixfs } = ctx
if (!dataCid) throw new Error('missing data CID')
if (path == null) throw new Error('missing URL pathname')
if (!dagula) throw new Error('missing dagula instance')
if (!unixfs) throw new Error('missing UnixFS context')

const entry = await dagula.getUnixfs(`${dataCid}${path}`, { signal: controller?.signal })
const entry = await unixfs.getUnixfs(`${dataCid}${path}`, { signal: controller?.signal })

if (!['file', 'raw', 'directory', 'hamt-directory', 'identity'].includes(entry.type)) {
throw new HttpError('unsupported entry type', { status: 501 })
Expand Down
28 changes: 19 additions & 9 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { parseCid, tryParseCid } from './util/cid.js'
/** @typedef {import('./bindings.js').Context} Context */

const CF_CACHE_MAX_OBJECT_SIZE = 512 * Math.pow(1024, 2) // 512MB to bytes
const HTTP_PARTIAL_CONTENT = 206

/**
* Creates a fresh context object that can be mutated by the request.
Expand Down Expand Up @@ -92,18 +93,27 @@ export function withErrorHandler (handler) {
}

/**
* Validates the request uses a HTTP GET method.
* @type {import('./bindings.js').Middleware<Context>}
* Validates the request uses a specific HTTP method(s).
* @param {...string} method Allowed HTTP method(s).
* @returns {import('./bindings.js').Middleware<Context>}
*/
export function withHttpGet (handler) {
return (request, env, ctx) => {
if (request.method !== 'GET') {
throw Object.assign(new Error('method not allowed'), { status: 405 })
export function createWithHttpMethod (...method) {
return (handler) => {
return (request, env, ctx) => {
if (!method.includes(request.method)) {
throw new HttpError('method not allowed', { status: 405 })
}
return handler(request, env, ctx)
}
return handler(request, env, ctx)
}
}

/**
* Validates the request uses a HTTP GET method.
* @type {import('./bindings.js').Middleware<Context>}
*/
export const withHttpGet = createWithHttpMethod('GET')

/**
* Extracts the data CID, the path and search params from the URL.
* @type {import('./bindings.js').Middleware<import('./bindings.js').IpfsUrlContext>}
Expand Down Expand Up @@ -199,8 +209,8 @@ export function withCdnCache (handler) {

response = await handler(request, env, ctx)

// cache the repsonse if success status
if (response.ok && !response.headers.has('Content-Range')) {
// cache the repsonse if success status, and not range request
if (response.ok && response.status !== HTTP_PARTIAL_CONTENT) {
const contentLength = response.headers.get('Content-Length')
if (contentLength && parseInt(contentLength) < CF_CACHE_MAX_OBJECT_SIZE) {
ctx.waitUntil(cache.put(request, response.clone()))
Expand Down
Loading
Loading