Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generate sharded DAG index on client and invoke w index/add #1451

Merged
merged 15 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/access-client/src/access.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export const toCapabilities = (access) => {
export const spaceAccess = {
'space/*': {},
'blob/*': {},
'index/*': {},
'store/*': {},
'upload/*': {},
'access/*': {},
Expand Down
4 changes: 3 additions & 1 deletion packages/upload-api/src/blob/accept.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export function blobAcceptProvider(context) {
const url =
/** @type {API.URI<'https:'>} */
(
`https://w3s.link/ipfs/${content}?format=raw&origin=${encodeURIComponent(`r2://${R2_REGION}/${R2_BUCKET}`)}`
`https://w3s.link/ipfs/${content}?format=raw&origin=${encodeURIComponent(
`r2://${R2_REGION}/${R2_BUCKET}`
)}`
)

const locationClaim = await Assert.location.delegate({
Expand Down
12 changes: 12 additions & 0 deletions packages/upload-api/test/storage/blobs-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ export class BlobsStorage {
*/
async stream(digest) {
const key = this.#bucketPath(digest)
if (!this.server) {
const url = new URL(key, this.baseURL)
const res = await fetch(url.toString())
if (res.status === 404) return error(new BlobNotFound(digest))
if (!res.ok || !res.body) {
throw new Error(
`serverless blob storage failed to fetch from: ${url} status: ${res.status}`
)
}
return ok(res.body)
}

const bytes = this.content.get(key)
if (!bytes) return error(new BlobNotFound(digest))

Expand Down
134 changes: 96 additions & 38 deletions packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,23 @@ const cid = await uploadDirectory(conf, [
The buffering API loads all data into memory so is suitable only for small files. The root data CID is derived from the data before any transfer to the service takes place.

```js
import { UnixFS, CAR, Store, Upload } from '@web3-storage/upload-client'
import { UnixFS, CAR, Blob, Index, Upload } from '@web3-storage/upload-client'
import * as BlobIndexUtil from '@web3-storage/blob-index/util'
import * as Link from 'multiformats/link'

// Encode a file as a DAG, get back a root data CID and a set of blocks
const { cid, blocks } = await UnixFS.encodeFile(file)
// Encode the DAG as a CAR file
const car = await CAR.encode(blocks, cid)
// Store the CAR file to the service
const carCID = await Store.add(conf, car)
const carDigest = await Blob.add(conf, car)
// Create an index
const index = await BlobIndexUtil.fromShardArchives(cid, [new Uint8Array(await car.arrayBuffer())])
// Store the index to the service
const indexDigest = await Blob.add(conf, (await index.archive()).ok)
await Index.add(conf, Link.create(CAR.code, indexDigest))
// Register an "upload" - a root CID contained within the passed CAR file(s)
await Upload.add(conf, cid, [carCID])
await Upload.add(conf, cid, [Link.create(CAR.code, carDigest)])
```

#### Streaming API
Expand All @@ -97,11 +104,14 @@ This API offers streaming DAG generation, allowing CAR "shards" to be sent to th
import {
UnixFS,
ShardingStream,
Store,
Blob,
Index,
Upload,
} from '@web3-storage/upload-client'
import { ShardedDAGIndex } from '@web3-storage/blob-index'

let rootCID, carCIDs
const shardIndexes = []
// Encode a file as a DAG, get back a readable stream of blocks.
await UnixFS.createFileEncoderStream(file)
// Pipe blocks to a stream that yields CARs files - shards of the DAG.
Expand All @@ -111,13 +121,29 @@ await UnixFS.createFileEncoderStream(file)
.pipeTo(
new WritableStream({
async write (car) {
const carCID = await Store.add(conf, car)
carCIDs.push(carCID)
const carDigest = await Blob.add(conf, car)
carCIDs.push(Link.create(CAR.code, carDigest))

// add the CAR shard itself to the slices
meta.slices.set(carDigest, [0, car.size])
shardIndexes.push(car.slices)

rootCID = rootCID || car.roots[0]
},
})
)

// Combine the shard indexes to create the complete DAG index
const index = ShardedDAGIndex.create(rootCID)
for (const [i, shard] of carCIDs.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}

// Store the index to the service
const indexDigest = await Blob.add(conf, (await index.archive()).ok)
await Index.add(conf, Link.create(CAR.code, indexDigest))

// Register an "upload" - a root CID contained within the passed CAR file(s)
await Upload.add(conf, rootCID, carCIDs)
```
Expand All @@ -135,12 +161,13 @@ await Upload.add(conf, rootCID, carCIDs)
- [`uploadDirectory`](#uploaddirectory)
- [`uploadFile`](#uploadfile)
- [`uploadCAR`](#uploadcar)
- [`Blob.add`](#blobadd)
- [`Blob.list`](#bloblist)
- [`Blob.remove`](#blobremove)
- [`CAR.BlockStream`](#carblockstream)
- [`CAR.encode`](#carencode)
- [`Index.add`](#indexadd)
- [`ShardingStream`](#shardingstream)
- [`Store.add`](#storeadd)
- [`Store.list`](#storelist)
- [`Store.remove`](#storeremove)
- [`UnixFS.createDirectoryEncoderStream`](#unixfscreatedirectoryencoderstream)
- [`UnixFS.createFileEncoderStream`](#unixfscreatefileencoderstream)
- [`UnixFS.encodeDirectory`](#unixfsencodedirectory)
Expand Down Expand Up @@ -178,7 +205,7 @@ function uploadDirectory(

Uploads a directory of files to the service and returns the root data CID for the generated DAG. All files are added to a container directory, with paths in file names preserved.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

Expand All @@ -200,7 +227,7 @@ function uploadFile(

Uploads a file to the service and returns the root data CID for the generated DAG.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig)

Expand All @@ -221,12 +248,58 @@ function uploadCAR(
): Promise<CID>
```

Uploads a CAR file to the service. The difference between this function and [Store.add](#storeadd) is that the CAR file is automatically sharded and an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.
Uploads a CAR file to the service. The difference between this function and [Blob.add](#blobadd) is that the CAR file is automatically sharded, an index is generated, uploaded and registered (see [`Index.add`](#indexadd)) and finally an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

### `Blob.add`

```ts
function add(
blob: Blob,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<MultihashDigest>
```

Store a blob to the service.

Required delegated capability proofs: `blob/add`

More information: [`InvocationConfig`](#invocationconfig)

### `Blob.list`

```ts
function list(
conf: InvocationConfig,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<ListResponse<BlobListResult>>
```

List blobs stored in the space.

Required delegated capability proofs: `blob/list`

More information: [`InvocationConfig`](#invocationconfig)

### `Blob.remove`

```ts
function remove(
conf: InvocationConfig,
digest: MultihashDigest,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<void>
```

Remove a stored blob by multihash digest.

Required delegated capability proofs: `blob/remove`

More information: [`InvocationConfig`](#invocationconfig)

### `CAR.BlockStream`

```ts
Expand All @@ -252,46 +325,31 @@ const { cid, blocks } = await UnixFS.encodeFile(new Blob(['data']))
const car = await CAR.encode(blocks, cid)
```

### `ShardingStream`

```ts
class ShardingStream extends TransformStream<Block, CARFile>
```

Shard a set of blocks into a set of CAR files. The last block written to the stream is assumed to be the DAG root and becomes the CAR root CID for the last CAR output.

More information: [`CARFile`](#carfile)

### `Store.list`
### `Index.add`

```ts
function list(
function add(
conf: InvocationConfig,
index: CID,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<ListResponse<StoreListResult>>
): Promise<IndexAddResponse>
```

List CAR files stored by the issuer.
Register an "index" with the service. The `index` CID should be the CID of a CAR file, containing an index ad defined by [w3-index](https://github.com/w3s-project/specs/blob/main/w3-index.md).

Required delegated capability proofs: `store/list`
Required delegated capability proofs: `index/add`

More information: [`InvocationConfig`](#invocationconfig)

### `Store.remove`
### `ShardingStream`

```ts
function remove(
conf: InvocationConfig,
link: CID,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<void>
class ShardingStream extends TransformStream<Block, CARFile>
```

Remove a stored CAR file by CAR CID.

Required delegated capability proofs: `store/remove`
Shard a set of blocks into a set of CAR files. The last block written to the stream is assumed to be the DAG root and becomes the CAR root CID for the last CAR output.

More information: [`InvocationConfig`](#invocationconfig)
More information: [`CARFile`](#carfile)

### `UnixFS.createDirectoryEncoderStream`

Expand Down
3 changes: 2 additions & 1 deletion packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@
"@ipld/dag-cbor": "^9.0.6",
"@ipld/dag-ucan": "^3.4.0",
"@ipld/unixfs": "^2.1.1",
"@ucanto/core": "^10.0.1",
"@ucanto/client": "^9.0.1",
"@ucanto/core": "^10.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/transport": "^9.1.1",
"@web3-storage/blob-index": "workspace:^",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/data-segment": "^5.1.0",
"@web3-storage/filecoin-client": "workspace:^",
Expand Down
13 changes: 10 additions & 3 deletions packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import varint from 'varint'
* @typedef {import('@ipld/unixfs').Block} Block
*/

export const code = 0x0202

/** Byte length of a CBOR encoded CAR header with zero roots. */
const NO_ROOTS_HEADER_LENGTH = 17
const NO_ROOTS_HEADER_LENGTH = 18
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 FML this was wrong. Either it was wrong the whole time or @ipld/car started encoding the header differently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be great to just export it from @ipld/car


/** @param {import('./types.js').AnyLink} [root] */
export function headerEncodingLength(root) {
Expand All @@ -18,10 +20,15 @@ export function headerEncodingLength(root) {
}

/** @param {Block} block */
export function blockEncodingLength(block) {
export function blockHeaderEncodingLength(block) {
const payloadLength = block.cid.bytes.length + block.bytes.length
const varintLength = varint.encodingLength(payloadLength)
return varintLength + payloadLength
return varintLength + block.cid.bytes.length
}

/** @param {Block} block */
export function blockEncodingLength(block) {
return blockHeaderEncodingLength(block) + block.bytes.length
}

/**
Expand Down
63 changes: 63 additions & 0 deletions packages/upload-client/src/dag-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import * as IndexCapabilities from '@web3-storage/capabilities/index'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

/**
* Register an "index" with the service. The issuer needs the `index/add`
* delegated capability.
*
* Required delegated capability proofs: `index/add`
*
* @param {import('./types.js').InvocationConfig} conf Configuration
* for the UCAN invocation. An object with `issuer`, `with` and `proofs`.
*
* The `issuer` is the signing authority that is issuing the UCAN
* invocation(s). It is typically the user _agent_.
*
* The `with` is the resource the invocation applies to. It is typically the
* DID of a space.
*
* The `proofs` are a set of capability delegations that prove the issuer
* has the capability to perform the action.
*
* The issuer needs the `index/add` delegated capability.
* @param {import('./types.js').CARLink} index Index to store.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('./types.js').IndexAddSuccess>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
index,
options = {}
) {
/* c8 ignore next */
const conn = options.connection ?? connection
const result = await retry(
async () => {
return await IndexCapabilities.add
.invoke({
issuer,
/* c8 ignore next */
audience: audience ?? servicePrincipal,
with: SpaceDID.from(resource),
nb: { index },
proofs,
})
.execute(conn)
},
{
onFailedAttempt: console.warn,
retries: options.retries ?? REQUEST_RETRIES,
}
)

if (!result.out.ok) {
throw new Error(`failed ${IndexCapabilities.add.can} invocation`, {
cause: result.out.error,
})
}

return result.out.ok
}
Loading
Loading