Skip to content

Commit

Permalink
refactor: use functions for block broker creation (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Oct 13, 2023
1 parent 9bad21b commit 43932a5
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 179 deletions.
4 changes: 2 additions & 2 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')
const { bitswap } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
bitswap()
],
libp2p: {
addresses: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { BlockAnnouncer, BlockBroker, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: MultihashHasher[]
}

export interface BitswapInit extends BitswapOptions {

}

class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
constructor (components: BitswapComponents, init: BitswapInit = {}) {
const { libp2p, blockstore, hashers } = components

this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
Expand All @@ -29,7 +40,8 @@ ProgressOptions<BitswapWantBlockProgressEvents>

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
},
...init
})
this.started = false
}
Expand Down Expand Up @@ -61,6 +73,6 @@ ProgressOptions<BitswapWantBlockProgressEvents>
* A helper factory for users who want to override Helia `blockBrokers` but
* still want to use the default `BitswapBlockBroker`.
*/
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
export function bitswap (init: BitswapInit = {}): (components: BitswapComponents) => BlockBroker {
return (components) => new BitswapBlockBroker(components, init)
}
4 changes: 2 additions & 2 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { bitswap } from './bitswap.js'
export { trustlessGateway } from './trustless-gateway/index.js'
65 changes: 65 additions & 0 deletions packages/helia/src/block-brokers/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { logger } from '@libp2p/logger'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (init: TrustlessGatewayBlockBrokerInit = {}) {
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []

for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
31 changes: 31 additions & 0 deletions packages/helia/src/block-brokers/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockRetriever } from '@helia/interface/src/blocks.js'
import type { ProgressEvent } from 'progress-events'

export const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

export interface TrustlessGatewayBlockBrokerInit {
gateways?: Array<string | URL>
}

export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): () => BlockRetriever {
return () => new TrustlessGatewayBlockBroker(init)
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import { logger } from '@libp2p/logger'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
Expand Down Expand Up @@ -129,64 +124,3 @@ export class TrustlessGateway {
this.#invalidBlocks++
}
}

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
24 changes: 20 additions & 4 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type BlockBroker } from '@helia/interface/blocks'
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import { CustomProgressEvent } from 'progress-events'
import { bitswap, trustlessGateway } from './block-brokers/index.js'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js'
import { defaultHashers } from './utils/default-hashers.js'
import { NetworkedStorage } from './utils/networked-storage.js'
import type { HeliaInit } from '.'
import type { GCOptions, Helia } from '@helia/interface'
Expand All @@ -20,7 +21,6 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
blockBrokers: BlockBroker[]
datastore: Datastore
}

Expand All @@ -31,9 +31,25 @@ export class HeliaImpl implements Helia {
public pins: Pins

constructor (init: HeliaImplInit) {
const hashers = defaultHashers(init.hashers)

const components = {
blockstore: init.blockstore,
datastore: init.datastore,
libp2p: init.libp2p,
hashers
}

const blockBrokers = init.blockBrokers?.map((fn) => {
return fn(components)
}) ?? [
bitswap()(components),
trustlessGateway()()
]

const networkedStorage = new NetworkedStorage(init.blockstore, {
blockBrokers: init.blockBrokers,
hashers: init.hashers
blockBrokers,
hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand Down
44 changes: 3 additions & 41 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
import type { Helia } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -98,7 +96,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>
blockBrokers?: Array<(components: any) => BlockBroker>

/**
* Pass `false` to not start the Helia node
Expand All @@ -123,23 +121,6 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -157,30 +138,11 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers = defaultHashers(init.hashers)

const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
if (typeof blockBroker !== 'function') {
return blockBroker satisfies BlockBroker
}
return blockBroker({
blockstore,
datastore,
libp2p,
hashers
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p,
blockBrokers,
hashers
libp2p
})

if (init.start !== false) {
Expand Down
Loading

0 comments on commit 43932a5

Please sign in to comment.