Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

booster bitswap MVP executable #707

Merged
merged 8 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/boostd
/devnet
/booster-http
/booster-bitswap
/docgen-md
/docgen-openrpc
extern/filecoin-ffi/rust/target
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ booster-http: $(BUILD_DEPS)
.PHONY: booster-http
BINS+=booster-http

booster-bitswap: $(BUILD_DEPS)
rm -f booster-bitswap
$(GOCC) build $(GOFLAGS) -o booster-bitswap ./cmd/booster-bitswap
.PHONY: booster-bitswap
BINS+=booster-bitswap

devnet: $(BUILD_DEPS)
rm -f devnet
$(GOCC) build $(GOFLAGS) -o devnet ./cmd/devnet
Expand Down
5 changes: 5 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type Boost interface {
BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read
BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read

// MethodGroup: Blockstore
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) //perm:read
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) //perm:read
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read

// RuntimeSubsystems returns the subsystems that are enabled
// in this instance.
RuntimeSubsystems(ctx context.Context) (lapi.MinerSubsystems, error) //perm:read
Expand Down
39 changes: 39 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/boost.json.gz
Binary file not shown.
42 changes: 42 additions & 0 deletions cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"os"

"github.com/filecoin-project/boost/build"
cliutil "github.com/filecoin-project/boost/cli/util"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
)

var log = logging.Logger("booster")

func main() {
app := &cli.App{
Name: "booster-bitswap",
Usage: "Bitswap endpoint for retrieval from Filecoin",
EnableBashCompletion: true,
Version: build.UserVersion(),
Flags: []cli.Flag{
cliutil.FlagVeryVerbose,
},
Commands: []*cli.Command{
runCmd,
},
}
app.Setup()

if err := app.Run(os.Args); err != nil {
os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
}

func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("booster", "INFO")

if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("booster", "DEBUG")
}

return nil
}
72 changes: 72 additions & 0 deletions cmd/booster-bitswap/remoteblockstore/remoteblockstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package remoteblockstore

import (
"context"
"errors"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"

"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

var log = logging.Logger("remote-blockstore")

var _ blockstore.Blockstore = (*RemoteBlockstore)(nil)

type RemoteBlockstoreAPI interface {
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error)
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error)
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error)
}

// RemoteBlockstore is a read-only blockstore over all cids across all pieces on a provider.
type RemoteBlockstore struct {
api RemoteBlockstoreAPI
}

func NewRemoteBlockstore(api RemoteBlockstoreAPI) blockstore.Blockstore {
return &RemoteBlockstore{
api: api,
}
}

func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, err error) {
log.Debugw("Get", "cid", c)
data, err := ro.api.BlockstoreGet(ctx, c)
log.Debugw("Get response", "cid", c, "error", err)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, c)
}

func (ro *RemoteBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
log.Debugw("Has", "cid", c)
has, err := ro.api.BlockstoreHas(ctx, c)
log.Debugw("Has response", "cid", c, "has", has, "error", err)
return has, err
}

func (ro *RemoteBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
log.Debugw("GetSize", "cid", c)
size, err := ro.api.BlockstoreGetSize(ctx, c)
log.Debugw("GetSize response", "cid", c, "size", size, "error", err)
return size, err
}

// --- UNSUPPORTED BLOCKSTORE METHODS -------
func (ro *RemoteBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return errors.New("unsupported operation DeleteBlock")
}
func (ro *RemoteBlockstore) HashOnRead(_ bool) {}
func (ro *RemoteBlockstore) Put(context.Context, blocks.Block) error {
return errors.New("unsupported operation Put")
}
func (ro *RemoteBlockstore) PutMany(context.Context, []blocks.Block) error {
return errors.New("unsupported operation PutMany")
}
func (ro *RemoteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}
102 changes: 102 additions & 0 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"strings"

"github.com/filecoin-project/boost/api"
bclient "github.com/filecoin-project/boost/api/client"
cliutil "github.com/filecoin-project/boost/cli/util"
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
"github.com/filecoin-project/go-jsonrpc"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/urfave/cli/v2"
)

var runCmd = &cli.Command{
Name: "run",
Usage: "Start a booster-bitswap process",
Before: before,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "pprof",
Usage: "run pprof web server on localhost:6070",
},
&cli.UintFlag{
Name: "port",
Usage: "the port to listen for bitswap requests on",
Value: 8888,
},
&cli.StringFlag{
Name: "api-boost",
Usage: "the endpoint for the boost API",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
if cctx.Bool("pprof") {
go func() {
err := http.ListenAndServe("localhost:6070", nil)
if err != nil {
log.Error(err)
}
}()
}

// Connect to the Boost API
ctx := lcli.ReqContext(cctx)
boostAPIInfo := cctx.String("api-boost")
bapi, bcloser, err := getBoostAPI(ctx, boostAPIInfo)
if err != nil {
return fmt.Errorf("getting boost API: %w", err)
}
defer bcloser()

remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
// Create the server API
port := cctx.Int("port")
server := NewBitswapServer(port, remoteStore)

// Start the server
log.Infof("Starting booster-bitswap node on port %d", port)
err = server.Start(ctx)
if err != nil {
return err
}
// Monitor for shutdown.
<-ctx.Done()

log.Info("Shutting down...")

err = server.Stop()
if err != nil {
return err
}
log.Info("Graceful shutdown successful")

// Sync all loggers.
_ = log.Sync() //nolint:errcheck

return nil
},
}

func getBoostAPI(ctx context.Context, ai string) (api.Boost, jsonrpc.ClientCloser, error) {
ai = strings.TrimPrefix(strings.TrimSpace(ai), "BOOST_API_INFO=")
info := cliutil.ParseApiInfo(ai)
addr, err := info.DialArgs("v0")
if err != nil {
return nil, nil, fmt.Errorf("could not get DialArgs: %w", err)
}

log.Infof("Using boost API at %s", addr)
api, closer, err := bclient.NewBoostRPCV0(ctx, addr, info.AuthHeader())
if err != nil {
return nil, nil, fmt.Errorf("creating full node service API: %w", err)
}

return api, closer, nil
}
75 changes: 75 additions & 0 deletions cmd/booster-bitswap/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"crypto/rand"
"fmt"

bsnetwork "github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/server"
blockstore "github.com/ipfs/go-ipfs-blockstore"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
)

type BitswapServer struct {
port int
remoteStore blockstore.Blockstore

ctx context.Context
cancel context.CancelFunc
server *server.Server
}

func NewBitswapServer(port int, remoteStore blockstore.Blockstore) *BitswapServer {
return &BitswapServer{port: port, remoteStore: remoteStore}
}

func (s *BitswapServer) Start(ctx context.Context) error {
s.ctx, s.cancel = context.WithCancel(ctx)
// setup libp2p host
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return err
}

host, err := libp2p.New(
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port),
),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(privKey),
libp2p.ResourceManager(network.NullResourceManager),
)
if err != nil {
return err
}

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
if err != nil {
return err
}
bsopts := []server.Option{server.MaxOutstandingBytesPerPeer(1 << 20)}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
s.server = server.New(ctx, net, s.remoteStore, bsopts...)
net.Start(s.server)

log.Infow("bitswap server running", "multiaddrs", host.Addrs(), "peerId", host.ID())
return nil
}

func (s *BitswapServer) Stop() error {
s.cancel()
return s.server.Close()
}
Loading