Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose closer for GetResponse #241

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 19 additions & 31 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}

if !allValid {
// can't shift in place because we don't want to clobber callers.
ks2 := make([]cid.Cid, 0, len(ks))
for _, c := range ks {
// hash security
Expand Down Expand Up @@ -333,52 +334,39 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
return
}

// batch available blocks together
const batchSize = 32
batch := make([]blocks.Block, 0, batchSize)
var cache [1]blocks.Block // preallocate once for all iterations
for {
var noMoreBlocks bool
batchLoop:
for len(batch) < batchSize {
select {
case b, ok := <-rblocks:
if !ok {
noMoreBlocks = true
break batchLoop
}

logger.Debugf("BlockService.BlockFetched %s", b.Cid())
batch = append(batch, b)
case <-ctx.Done():
var b blocks.Block
select {
case v, ok := <-rblocks:
if !ok {
return
default:
break batchLoop
}
b = v
case <-ctx.Done():
return
}

// also write in the blockstore for caching, inform the exchange that the blocks are available
err = bs.PutMany(ctx, batch)
// write in the blockstore for caching
err = bs.Put(ctx, b)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

err = f.NotifyNewBlocks(ctx, batch...)
// inform the exchange that the blocks are available
cache[0] = b
err = f.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc

for _, b := range batch {
select {
case out <- b:
case <-ctx.Done():
return
}
}
batch = batch[:0]
if noMoreBlocks {
break
select {
case out <- b:
case <-ctx.Done():
return
}
}
}()
Expand Down
Binary file added examples/gateway/car-file-gateway/libp2pio.car
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/gateway/car/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
}
defer f.Close()

gwAPI, err := common.NewBlocksGateway(blockService, nil)
gwAPI, err := gateway.NewBlocksGateway(blockService)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion examples/gateway/car/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/ipfs/boxo/examples/gateway/common"
"github.com/ipfs/boxo/gateway"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/stretchr/testify/assert"
Expand All @@ -22,7 +23,7 @@ func newTestServer() (*httptest.Server, io.Closer, error) {
return nil, nil, err
}

gateway, err := common.NewBlocksGateway(blockService, nil)
gateway, err := gateway.NewBlocksGateway(blockService)
if err != nil {
_ = f.Close()
return nil, nil, err
Expand Down
231 changes: 1 addition & 230 deletions examples/gateway/common/blocks.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,12 @@
package common

import (
"context"
"errors"
"fmt"
"net/http"
gopath "path"

"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
iface "github.com/ipfs/boxo/coreiface"
nsopts "github.com/ipfs/boxo/coreiface/options/namesys"
ifacepath "github.com/ipfs/boxo/coreiface/path"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/gateway"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs"
ufile "github.com/ipfs/boxo/ipld/unixfs/file"
uio "github.com/ipfs/boxo/ipld/unixfs/io"
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/namesys/resolve"
ipfspath "github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
mc "github.com/multiformats/go-multicodec"
)

func NewBlocksHandler(gw *BlocksGateway, port int) http.Handler {
func NewBlocksHandler(gw gateway.IPFSBackend, port int) http.Handler {
headers := map[string][]string{}
gateway.AddAccessControlHeaders(headers)

Expand All @@ -50,202 +20,3 @@ func NewBlocksHandler(gw *BlocksGateway, port int) http.Handler {
mux.Handle("/ipns/", gwHandler)
return mux
}

type BlocksGateway struct {
blockStore blockstore.Blockstore
blockService blockservice.BlockService
dagService format.DAGService
resolver resolver.Resolver

// Optional routing system to handle /ipns addresses.
namesys namesys.NameSystem
routing routing.ValueStore
}

func NewBlocksGateway(blockService blockservice.BlockService, routing routing.ValueStore) (*BlocksGateway, error) {
// Setup the DAG services, which use the CAR block store.
dagService := merkledag.NewDAGService(blockService)

// Setup the UnixFS resolver.
fetcherConfig := bsfetcher.NewFetcherConfig(blockService)
fetcherConfig.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
fetcher := fetcherConfig.WithReifier(unixfsnode.Reify)
resolver := resolver.NewBasicResolver(fetcher)

// Setup a name system so that we are able to resolve /ipns links.
var (
ns namesys.NameSystem
err error
)
if routing != nil {
ns, err = namesys.NewNameSystem(routing)
if err != nil {
return nil, err
}
}

return &BlocksGateway{
blockStore: blockService.Blockstore(),
blockService: blockService,
dagService: dagService,
resolver: resolver,
routing: routing,
namesys: ns,
}, nil
}

func (api *BlocksGateway) GetUnixFsNode(ctx context.Context, p ifacepath.Resolved) (files.Node, error) {
nd, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

return ufile.NewUnixfsFile(ctx, api.dagService, nd)
}

func (api *BlocksGateway) LsUnixFsDir(ctx context.Context, p ifacepath.Resolved) (<-chan iface.DirEntry, error) {
node, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

dir, err := uio.NewDirectoryFromNode(api.dagService, node)
if err != nil {
return nil, err
}

out := make(chan iface.DirEntry, uio.DefaultShardWidth)

go func() {
defer close(out)
for l := range dir.EnumLinksAsync(ctx) {
select {
case out <- api.processLink(ctx, l):
case <-ctx.Done():
return
}
}
}()

return out, nil
}

func (api *BlocksGateway) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return api.blockService.GetBlock(ctx, c)
}

func (api *BlocksGateway) GetIPNSRecord(ctx context.Context, c cid.Cid) ([]byte, error) {
if api.routing == nil {
return nil, routing.ErrNotSupported
}

// Fails fast if the CID is not an encoded Libp2p Key, avoids wasteful
// round trips to the remote routing provider.
if mc.Code(c.Type()) != mc.Libp2pKey {
return nil, errors.New("provided cid is not an encoded libp2p key")
}

// The value store expects the key itself to be encoded as a multihash.
id, err := peer.FromCid(c)
if err != nil {
return nil, err
}

return api.routing.GetValue(ctx, "/ipns/"+string(id))
}

func (api *BlocksGateway) GetDNSLinkRecord(ctx context.Context, hostname string) (ifacepath.Path, error) {
if api.namesys != nil {
p, err := api.namesys.Resolve(ctx, "/ipns/"+hostname, nsopts.Depth(1))
if err == namesys.ErrResolveRecursion {
err = nil
}
return ifacepath.New(p.String()), err
}

return nil, errors.New("not implemented")
}

func (api *BlocksGateway) IsCached(ctx context.Context, p ifacepath.Path) bool {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return false
}

has, _ := api.blockStore.Has(ctx, rp.Cid())
return has
}

func (api *BlocksGateway) ResolvePath(ctx context.Context, p ifacepath.Path) (ifacepath.Resolved, error) {
if _, ok := p.(ifacepath.Resolved); ok {
return p.(ifacepath.Resolved), nil
}

err := p.IsValid()
if err != nil {
return nil, err
}

ipath := ipfspath.Path(p.String())
if ipath.Segments()[0] == "ipns" {
ipath, err = resolve.ResolveIPNS(ctx, api.namesys, ipath)
if err != nil {
return nil, err
}
}

if ipath.Segments()[0] != "ipfs" {
return nil, fmt.Errorf("unsupported path namespace: %s", p.Namespace())
}

node, rest, err := api.resolver.ResolveToLastNode(ctx, ipath)
if err != nil {
return nil, err
}

root, err := cid.Parse(ipath.Segments()[1])
if err != nil {
return nil, err
}

return ifacepath.NewResolvedPath(ipath, node, root, gopath.Join(rest...)), nil
}

func (api *BlocksGateway) resolveNode(ctx context.Context, p ifacepath.Path) (format.Node, error) {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}

node, err := api.dagService.Get(ctx, rp.Cid())
if err != nil {
return nil, fmt.Errorf("get node: %w", err)
}
return node, nil
}

func (api *BlocksGateway) processLink(ctx context.Context, result unixfs.LinkResult) iface.DirEntry {
if result.Err != nil {
return iface.DirEntry{Err: result.Err}
}

link := iface.DirEntry{
Name: result.Link.Name,
Cid: result.Link.Cid,
}

switch link.Cid.Type() {
case cid.Raw:
link.Type = iface.TFile
link.Size = result.Link.Size
case cid.DagProtobuf:
link.Size = result.Link.Size
}

return link
}
2 changes: 1 addition & 1 deletion examples/gateway/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
routing := newProxyRouting(*gatewayUrlPtr, nil)

// Creates the gateway with the block service and the routing.
gwAPI, err := common.NewBlocksGateway(blockService, routing)
gwAPI, err := gateway.NewBlocksGateway(blockService, gateway.WithValueStore(routing))
if err != nil {
log.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions examples/gateway/proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/examples/gateway/common"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/gateway"
"github.com/ipfs/go-block-format"
"github.com/stretchr/testify/assert"
)
Expand All @@ -22,12 +23,12 @@ func newProxyGateway(t *testing.T, rs *httptest.Server) *httptest.Server {
blockService := blockservice.New(blockStore, offline.Exchange(blockStore))
routing := newProxyRouting(rs.URL, nil)

gateway, err := common.NewBlocksGateway(blockService, routing)
gw, err := gateway.NewBlocksGateway(blockService, gateway.WithValueStore(routing))
if err != nil {
t.Error(err)
}

handler := common.NewBlocksHandler(gateway, 0)
handler := common.NewBlocksHandler(gw, 0)
ts := httptest.NewServer(handler)
t.Cleanup(ts.Close)

Expand Down
Loading