Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable to use defaultDagBuilder with custom builders #413

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions packages/ipfs-unixfs-importer/src/dag-builder/dir.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import { persist } from '../utils/persist.js'
import type { Directory, InProgressImportResult, WritableStorage } from '../index.js'
import type {
Directory,
InProgressImportResult,
WritableStorage
} from '../index.js'
import type { Version } from 'multiformats/cid'

export interface DirBuilderOptions {
cidVersion: Version
signal?: AbortSignal
}

export const dirBuilder = async (dir: Directory, blockstore: WritableStorage, options: DirBuilderOptions): Promise<InProgressImportResult> => {
export interface DirBuilder {
(
dir: Directory,
blockstore: WritableStorage,
options: DirBuilderOptions
): Promise<InProgressImportResult>
}

export const defaultDirBuilder = async (
dir: Directory,
blockstore: WritableStorage,
options: DirBuilderOptions
): Promise<InProgressImportResult> => {
const unixfs = new UnixFS({
type: 'directory',
mtime: dir.mtime,
Expand Down
109 changes: 85 additions & 24 deletions packages/ipfs-unixfs-importer/src/dag-builder/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import parallelBatch from 'it-parallel-batch'
import * as rawCodec from 'multiformats/codecs/raw'
import { CustomProgressEvent } from 'progress-events'
import { persist } from '../utils/persist.js'
import type { BufferImporter, File, InProgressImportResult, WritableStorage, SingleBlockImportResult, ImporterProgressEvents } from '../index.js'
import type {
BufferImporter,
File,
InProgressImportResult,
WritableStorage,
SingleBlockImportResult,
ImporterProgressEvents
} from '../index.js'
import type { FileLayout, Reducer } from '../layout/index.js'
import type { CID, Version } from 'multiformats/cid'
import type { ProgressOptions, ProgressEvent } from 'progress-events'
Expand All @@ -14,11 +21,18 @@ interface BuildFileBatchOptions {
blockWriteConcurrency: number
}

async function * buildFileBatch (file: File, blockstore: WritableStorage, options: BuildFileBatchOptions): AsyncGenerator<InProgressImportResult> {
async function * buildFileBatch (
file: File,
blockstore: WritableStorage,
options: BuildFileBatchOptions
): AsyncGenerator<InProgressImportResult> {
let count = -1
let previous: SingleBlockImportResult | undefined

for await (const entry of parallelBatch(options.bufferImporter(file, blockstore), options.blockWriteConcurrency)) {
for await (const entry of parallelBatch(
options.bufferImporter(file, blockstore),
options.blockWriteConcurrency
)) {
count++

if (count === 0) {
Expand All @@ -29,7 +43,7 @@ async function * buildFileBatch (file: File, blockstore: WritableStorage, option
}

continue
} else if (count === 1 && (previous != null)) {
} else if (count === 1 && previous != null) {
// we have the second block of a multiple block import so yield the first
yield {
...previous,
Expand Down Expand Up @@ -63,8 +77,10 @@ export interface LayoutLeafProgress {
path?: string
}

export type ReducerProgressEvents =
ProgressEvent<'unixfs:importer:progress:file:layout', LayoutLeafProgress>
export type ReducerProgressEvents = ProgressEvent<
'unixfs:importer:progress:file:layout',
LayoutLeafProgress
>

interface ReduceOptions extends ProgressOptions<ImporterProgressEvents> {
reduceSingleLeafToSelf: boolean
Expand All @@ -76,13 +92,24 @@ function isSingleBlockImport (result: any): result is SingleBlockImportResult {
return result.single === true
}

const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions): Reducer => {
const reduce = (
file: File,
blockstore: WritableStorage,
options: ReduceOptions
): Reducer => {
const reducer: Reducer = async function (leaves) {
if (leaves.length === 1 && isSingleBlockImport(leaves[0]) && options.reduceSingleLeafToSelf) {
if (
leaves.length === 1 &&
isSingleBlockImport(leaves[0]) &&
options.reduceSingleLeafToSelf
) {
const leaf = leaves[0]
let node: Uint8Array | PBNode = leaf.block

if (isSingleBlockImport(leaf) && (file.mtime !== undefined || file.mode !== undefined)) {
if (
isSingleBlockImport(leaf) &&
(file.mtime !== undefined || file.mode !== undefined)
) {
// only one leaf node which is a raw leaf - we have metadata so convert it into a
// UnixFS entry otherwise we'll have nowhere to store the metadata
leaf.unixfs = new UnixFS({
Expand All @@ -103,10 +130,15 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
leaf.size = BigInt(leaf.block.length)
}

options.onProgress?.(new CustomProgressEvent<LayoutLeafProgress>('unixfs:importer:progress:file:layout', {
cid: leaf.cid,
path: leaf.originalPath
}))
options.onProgress?.(
new CustomProgressEvent<LayoutLeafProgress>(
'unixfs:importer:progress:file:layout',
{
cid: leaf.cid,
path: leaf.originalPath
}
)
)

return {
cid: leaf.cid,
Expand All @@ -125,12 +157,16 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
})

const links: PBLink[] = leaves
.filter(leaf => {
.filter((leaf) => {
if (leaf.cid.code === rawCodec.code && leaf.size > 0) {
return true
}

if ((leaf.unixfs != null) && (leaf.unixfs.data == null) && leaf.unixfs.fileSize() > 0n) {
if (
leaf.unixfs != null &&
leaf.unixfs.data == null &&
leaf.unixfs.fileSize() > 0n
) {
return true
}

Expand Down Expand Up @@ -170,16 +206,24 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
const block = encode(prepare(node))
const cid = await persist(block, blockstore, options)

options.onProgress?.(new CustomProgressEvent<LayoutLeafProgress>('unixfs:importer:progress:file:layout', {
cid,
path: file.originalPath
}))
options.onProgress?.(
new CustomProgressEvent<LayoutLeafProgress>(
'unixfs:importer:progress:file:layout',
{
cid,
path: file.originalPath
}
)
)

return {
cid,
path: file.path,
unixfs: f,
size: BigInt(block.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)),
size: BigInt(
block.length +
node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)
),
originalPath: file.originalPath,
block
}
Expand All @@ -188,10 +232,27 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
return reducer
}

export interface FileBuilderOptions extends BuildFileBatchOptions, ReduceOptions {
layout: FileLayout
export interface FileBuilder {
(
file: File,
blockstore: WritableStorage,
options: FileBuilderOptions
): Promise<InProgressImportResult>
}

export const fileBuilder = async (file: File, block: WritableStorage, options: FileBuilderOptions): Promise<InProgressImportResult> => {
return options.layout(buildFileBatch(file, block, options), reduce(file, block, options))
export interface FileBuilderOptions
extends BuildFileBatchOptions,
ReduceOptions {
layout: FileLayout
}

export const defaultFileBuilder = async (
file: File,
block: WritableStorage,
options: FileBuilderOptions
): Promise<InProgressImportResult> => {
return options.layout(
buildFileBatch(file, block, options),
reduce(file, block, options)
);
};
70 changes: 52 additions & 18 deletions packages/ipfs-unixfs-importer/src/dag-builder/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { CustomProgressEvent } from 'progress-events'
import { InvalidContentError } from '../errors.js'
import { dirBuilder, type DirBuilderOptions } from './dir.js'
import { fileBuilder, type FileBuilderOptions } from './file.js'
import { defaultDirBuilder, type DirBuilder, type DirBuilderOptions } from './dir.js'
import { defaultFileBuilder, type FileBuilder, type FileBuilderOptions } from './file.js'
import type { ChunkValidator } from './validate-chunks.js'
import type { Chunker } from '../chunker/index.js'
import type { Directory, File, FileCandidate, ImportCandidate, ImporterProgressEvents, InProgressImportResult, WritableStorage } from '../index.js'
import type {
Directory,
File,
FileCandidate,
ImportCandidate,
ImporterProgressEvents,
InProgressImportResult,
WritableStorage
} from '../index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
Expand All @@ -27,8 +35,10 @@ export interface ImportReadProgress {
path?: string
}

export type DagBuilderProgressEvents =
ProgressEvent<'unixfs:importer:progress:file:read', ImportReadProgress>
export type DagBuilderProgressEvents = ProgressEvent<
'unixfs:importer:progress:file:read',
ImportReadProgress
>

function isIterable (thing: any): thing is Iterable<any> {
return Symbol.iterator in thing
Expand All @@ -38,16 +48,18 @@ function isAsyncIterable (thing: any): thing is AsyncIterable<any> {
return Symbol.asyncIterator in thing
}

function contentAsAsyncIterable (content: Uint8Array | AsyncIterable<Uint8Array> | Iterable<Uint8Array>): AsyncIterable<Uint8Array> {
function contentAsAsyncIterable (
content: Uint8Array | AsyncIterable<Uint8Array> | Iterable<Uint8Array>
): AsyncIterable<Uint8Array> {
try {
if (content instanceof Uint8Array) {
return (async function * () {
yield content
}())
})()
} else if (isIterable(content)) {
return (async function * () {
yield * content
}())
})()
} else if (isAsyncIterable(content)) {
return content
}
Expand All @@ -58,16 +70,25 @@ function contentAsAsyncIterable (content: Uint8Array | AsyncIterable<Uint8Array>
throw new InvalidContentError('Content was invalid')
}

export interface DagBuilderOptions extends FileBuilderOptions, DirBuilderOptions, ProgressOptions<ImporterProgressEvents> {
export interface DagBuilderOptions
extends FileBuilderOptions,
DirBuilderOptions,
ProgressOptions<ImporterProgressEvents> {
chunker: Chunker
chunkValidator: ChunkValidator
wrapWithDirectory: boolean
dirBuilder?: DirBuilder
fileBuilder?: FileBuilder
}

export type ImporterSourceStream = AsyncIterable<ImportCandidate> | Iterable<ImportCandidate>
export type ImporterSourceStream =
| AsyncIterable<ImportCandidate>
| Iterable<ImportCandidate>

export interface DAGBuilder {
(source: ImporterSourceStream, blockstore: WritableStorage): AsyncIterable<() => Promise<InProgressImportResult>>
(source: ImporterSourceStream, blockstore: WritableStorage): AsyncIterable<
() => Promise<InProgressImportResult>
>
}

export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
Expand All @@ -79,7 +100,7 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
originalPath = entry.path
entry.path = entry.path
.split('/')
.filter(path => path != null && path !== '.')
.filter((path) => path != null && path !== '.')
.join('/')
}

Expand All @@ -91,22 +112,31 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
content: (async function * () {
let bytesRead = 0n

for await (const chunk of options.chunker(options.chunkValidator(contentAsAsyncIterable(entry.content)))) {
for await (const chunk of options.chunker(
options.chunkValidator(contentAsAsyncIterable(entry.content))
)) {
const currentChunkSize = BigInt(chunk.byteLength)
bytesRead += currentChunkSize

options.onProgress?.(new CustomProgressEvent<ImportReadProgress>('unixfs:importer:progress:file:read', {
bytesRead,
chunkSize: currentChunkSize,
path: entry.path
}))
options.onProgress?.(
new CustomProgressEvent<ImportReadProgress>(
'unixfs:importer:progress:file:read',
{
bytesRead,
chunkSize: currentChunkSize,
path: entry.path
}
)
)

yield chunk
}
})(),
originalPath
}

const fileBuilder = options.fileBuilder ?? defaultFileBuilder

yield async () => fileBuilder(file, blockstore, options)
} else if (entry.path != null) {
const dir: Directory = {
Expand All @@ -116,6 +146,10 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
originalPath
}

const dirBuilder =
options.dirBuilder ??
defaultDirBuilder

yield async () => dirBuilder(dir, blockstore, options)
} else {
throw new Error('Import candidate must have content or path or both')
Expand Down
Loading
Loading