Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
backend fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Mar 23, 2023
1 parent 34e97fb commit 4725228
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 36 deletions.
Binary file added backend/backend.exe
Binary file not shown.
169 changes: 136 additions & 33 deletions backend/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"context"
"fmt"
bsfetcher "github.com/ipfs/go-fetcher/impl/blockservice"
"github.com/ipfs/go-fetcher"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/files"
Expand All @@ -18,8 +18,11 @@ import (
"github.com/ipld/go-car/util"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -179,14 +182,6 @@ func simpleSelectorToCar(ctx context.Context, bsrv blockservice.BlockService, p
}

r, w := io.Pipe()
// Setup header for the output car
err = car.WriteHeader(&car.CarHeader{
Roots: []cid.Cid{rootCid},
Version: 1,
}, w)
if err != nil {
return nil, fmt.Errorf("writing car header: %w", err)
}

rangeStr, hasRange := params.Get("bytes"), params.Has("bytes")
depthStr, hasDepth := params.Get("depth"), params.Has("depth")
Expand All @@ -204,6 +199,16 @@ func simpleSelectorToCar(ctx context.Context, bsrv blockservice.BlockService, p

go func() {
defer w.Close()

// Setup header for the output car
err = car.WriteHeader(&car.CarHeader{
Roots: []cid.Cid{rootCid},
Version: 1,
}, w)
if err != nil {
goLog.Error(fmt.Errorf("writing car header: %w", err))
}

blockGetter := merkledag.NewDAGService(bsrv).Session(ctx)
blockGetter = &nodeGetterToCarExporer{
ng: blockGetter,
Expand All @@ -213,17 +218,10 @@ func simpleSelectorToCar(ctx context.Context, bsrv blockservice.BlockService, p
dsrv := merkledag.NewReadOnlyDagService(blockGetter)

// Setup the UnixFS resolver.
fetcherConfig := bsfetcher.NewFetcherConfig(bsrv)
fetcherConfig.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
fetcher := fetcherConfig.WithReifier(unixfsnode.Reify)
r := resolver.NewBasicResolver(fetcher)
f := newNodeGetterFetcherSingleUseFactory(ctx, blockGetter)
pathResolver := resolver.NewBasicResolver(f)

lastCid, remainder, err := r.ResolveToLastNode(ctx, ipfspath)
lastCid, remainder, err := pathResolver.ResolveToLastNode(ctx, ipfspath)
if err != nil {
goLog.Error(err)
return
Expand Down Expand Up @@ -259,28 +257,37 @@ func simpleSelectorToCar(ctx context.Context, bsrv blockservice.BlockService, p
// this is an error, so we're done
return
}
if hasRange {
// TODO: testing + check off by one errors
var numToRead int64
if *getRange.To < 0 {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return
}
numToRead = (size - *getRange.To) - int64(getRange.From)
} else {
numToRead = int64(getRange.From) - *getRange.To

if !hasRange {
nw, err := io.Copy(io.Discard, f)
goLog.Debugf("nwritten %d", nw)
if err != nil {
goLog.Error(err)
}
return
}

if _, err := f.Seek(int64(getRange.From), io.SeekStart); err != nil {
// TODO: testing + check off by one errors
var numToRead int64
if *getRange.To < 0 {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return
}
_, _ = io.CopyN(io.Discard, f, numToRead)
numToRead = (size - *getRange.To) - int64(getRange.From)
} else {
numToRead = int64(getRange.From) - *getRange.To
}

if _, err := f.Seek(int64(getRange.From), io.SeekStart); err != nil {
return
}
_, _ = io.CopyN(io.Discard, f, numToRead)
return
} else if d, ok := ufsNode.(files.Directory); ok {
if depthStr == "1" {
for d.Entries().Next() {
iter := d.Entries()
for iter.Next() {
}
return
}
Expand Down Expand Up @@ -375,6 +382,102 @@ func (n *nodeGetterToCarExporer) trySendBlock(block blocks.Block) error {

var _ format.NodeGetter = (*nodeGetterToCarExporer)(nil)

type nodeGetterFetcherSingleUseFactory struct {
linkSystem ipld.LinkSystem
protoChooser traversal.LinkTargetNodePrototypeChooser
}

func newNodeGetterFetcherSingleUseFactory(ctx context.Context, ng format.NodeGetter) *nodeGetterFetcherSingleUseFactory {
ls := cidlink.DefaultLinkSystem()
ls.TrustedStorage = true
ls.StorageReadOpener = blockOpener(ctx, ng)
ls.NodeReifier = unixfsnode.Reify

pc := dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})

return &nodeGetterFetcherSingleUseFactory{ls, pc}
}

func (n *nodeGetterFetcherSingleUseFactory) NewSession(ctx context.Context) fetcher.Fetcher {
return n
}

func (n *nodeGetterFetcherSingleUseFactory) NodeMatching(ctx context.Context, root ipld.Node, selector ipld.Node, cb fetcher.FetchCallback) error {
return n.nodeMatching(ctx, n.blankProgress(ctx), root, selector, cb)
}

func (n *nodeGetterFetcherSingleUseFactory) BlockOfType(ctx context.Context, link ipld.Link, nodePrototype ipld.NodePrototype) (ipld.Node, error) {
return n.linkSystem.Load(ipld.LinkContext{}, link, nodePrototype)
}

func (n *nodeGetterFetcherSingleUseFactory) BlockMatchingOfType(ctx context.Context, root ipld.Link, selector ipld.Node, nodePrototype ipld.NodePrototype, cb fetcher.FetchCallback) error {
// retrieve first node
prototype, err := n.PrototypeFromLink(root)
if err != nil {
return err
}
node, err := n.BlockOfType(ctx, root, prototype)
if err != nil {
return err
}

progress := n.blankProgress(ctx)
progress.LastBlock.Link = root
return n.nodeMatching(ctx, progress, node, selector, cb)
}

func (n *nodeGetterFetcherSingleUseFactory) PrototypeFromLink(lnk ipld.Link) (ipld.NodePrototype, error) {
return n.protoChooser(lnk, ipld.LinkContext{})
}

func (n *nodeGetterFetcherSingleUseFactory) nodeMatching(ctx context.Context, initialProgress traversal.Progress, node ipld.Node, match ipld.Node, cb fetcher.FetchCallback) error {
matchSelector, err := selector.ParseSelector(match)
if err != nil {
return err
}
return initialProgress.WalkMatching(node, matchSelector, func(prog traversal.Progress, n ipld.Node) error {
return cb(fetcher.FetchResult{
Node: n,
Path: prog.Path,
LastBlockPath: prog.LastBlock.Path,
LastBlockLink: prog.LastBlock.Link,
})
})
}

func (n *nodeGetterFetcherSingleUseFactory) blankProgress(ctx context.Context) traversal.Progress {
return traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: n.linkSystem,
LinkTargetNodePrototypeChooser: n.protoChooser,
},
}
}

func blockOpener(ctx context.Context, ng format.NodeGetter) ipld.BlockReadOpener {
return func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidLink, ok := lnk.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("invalid link type for loading: %v", lnk)
}

blk, err := ng.Get(ctx, cidLink.Cid)
if err != nil {
return nil, err
}

return bytes.NewReader(blk.RawData()), nil
}
}

var _ fetcher.Fetcher = (*nodeGetterFetcherSingleUseFactory)(nil)
var _ fetcher.Factory = (*nodeGetterFetcherSingleUseFactory)(nil)

func rangeStrToGetRange(rangeStr string) (*gateway.GetRange, error) {
rangeElems := strings.Split(rangeStr, ":")
if len(rangeElems) > 2 {
Expand Down
13 changes: 10 additions & 3 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
golog "github.com/ipfs/go-log/v2"
carbs "github.com/ipld/go-car/v2/blockstore"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/spf13/cobra"
"log"
"net/http"
Expand Down Expand Up @@ -49,7 +52,7 @@ var rootCmd = &cobra.Command{
carbsLocation, _ := cmd.Flags().GetString("car-blockstore")

var bsrv blockservice.BlockService
if carbsLocation == "" {
if carbsLocation != "" {
bs, err := carbs.OpenReadOnly(carbsLocation)
if err != nil {
return err
Expand All @@ -67,11 +70,15 @@ var rootCmd = &cobra.Command{
return err
}

h, err := libp2p.New()
var r routing.Routing
h, err := libp2p.New(libp2p.Routing(func(host host.Host) (routing.PeerRouting, error) {
r, err = dht.New(cmd.Context(), host, dht.BootstrapPeersFunc(dht.GetDefaultBootstrapPeerAddrInfos))
return r, err
}))
if err != nil {
return err
}
n := network.NewFromIpfsHost(h, nil)
n := network.NewFromIpfsHost(h, r)
bsc := client.New(cmd.Context(), n, bs)
n.Start(bsc)
defer n.Stop()
Expand Down
31 changes: 31 additions & 0 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/ipfs/bifrost-gateway/lib"
"io"
"log"
"math/rand"
Expand Down Expand Up @@ -35,6 +36,36 @@ type proxyBlockStore struct {
rand *rand.Rand
}

func (ps *proxyBlockStore) Fetch(ctx context.Context, path string, cb lib.DataCallback) error {
u, err := url.Parse(fmt.Sprintf("%s/%s", ps.getRandomGatewayURL(), path))
if err != nil {
return err
}
resp, err := ps.httpClient.Do(&http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{
"Accept": []string{"application/vnd.ipld.car"},
},
})
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error from car gateway: %s", resp.Status)
}

err = cb(path, resp.Body)
if err != nil {
resp.Body.Close()
return err
}
return resp.Body.Close()
}

var _ lib.CarFetcher = (*proxyBlockStore)(nil)

func newProxyBlockStore(gatewayURL []string, cdns *cachedDNS) blockstore.Blockstore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)
Expand Down

0 comments on commit 4725228

Please sign in to comment.