Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move providing responsabilities from bitswap to blockservice #677

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes:
* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.

### Changed

Expand Down Expand Up @@ -61,6 +62,7 @@ The following emojis are used to highlight certain changes:
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636)
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651)
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629)
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

## [v0.21.0]

Expand Down
12 changes: 2 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand Down Expand Up @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024
Expand Down
4 changes: 0 additions & 4 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}

func ProvideEnabled(enabled bool) Option {
return Option{server.ProvideEnabled(enabled)}
}

func SetSendDontHaves(send bool) Option {
return Option{server.SetSendDontHaves(send)}
}
Expand Down
166 changes: 8 additions & 158 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)

var provideKeysBufferSize = 2048

var (
log = logging.Logger("bitswap/server")
sflog = log.Desugar()
)

const provideWorkerMax = 6

type Option func(*Server)

type Server struct {
Expand All @@ -59,20 +54,8 @@ type Server struct {

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

// Extra options to pass to the decision manager
engineOptions []decision.Option

// the size of channel buffer to use
hasBlockBufferSize int
// whether or not to make provide announcements
provideEnabled bool
}

func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server {
Expand All @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}()

s := &Server{
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
provideEnabled: true,
hasBlockBufferSize: defaults.HasBlockBufferSize,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
}
s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize)

for _, o := range options {
o(s)
Expand Down Expand Up @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
}
}

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Server) {
bs.provideEnabled = enabled
}
}

func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option {
o := decision.WithPeerBlockRequestFilter(pbrf)
return func(bs *Server) {
Expand Down Expand Up @@ -241,16 +213,6 @@ func MaxCidSize(n uint) Option {
}
}

// HasBlockBufferSize configure how big the new blocks buffer should be.
func HasBlockBufferSize(count int) Option {
if count < 0 {
panic("cannot have negative buffer size")
}
return func(bs *Server) {
bs.hasBlockBufferSize = count
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which the bitswap server will replace a WantHave with a WantBlock response.
//
Expand Down Expand Up @@ -303,18 +265,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
bs.taskWorker(ctx, i)
})
}

if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}

func (bs *Server) taskWorker(ctx context.Context, id int) {
Expand Down Expand Up @@ -422,18 +372,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
}

type Stat struct {
Peers []string
ProvideBufLen int
BlocksSent uint64
DataSent uint64
Peers []string
BlocksSent uint64
DataSent uint64
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Server) Stat() (Stat, error) {
bs.counterLk.Lock()
s := bs.counters
bs.counterLk.Unlock()
s.ProvideBufLen = len(bs.newBlocks)

peers := bs.engine.Peers()
peersStr := make([]string, len(peers))
Expand All @@ -460,107 +408,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

func (bs *Server) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []cid.Cid
var nextKey cid.Cid
var keysOut chan cid.Cid

for {
select {
case blkey, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}

if keysOut == nil {
nextKey = blkey
keysOut = bs.provideKeys
} else {
toProvide = append(toProvide, blkey)
}
case keysOut <- nextKey:
if len(toProvide) > 0 {
nextKey = toProvide[0]
toProvide = toProvide[1:]
} else {
keysOut = nil
}
case <-ctx.Done():
return
}
}
}

func (bs *Server) provideWorker(px process.Process) {
// FIXME: OnClosingContext returns a _custom_ context type.
// Unfortunately, deriving a new cancelable context from this custom
// type fires off a goroutine. To work around this, we create a single
// cancelable context up-front and derive all sub-contexts from that.
//
// See: https://github.com/ipfs/go-ipfs/issues/5810
ctx := procctx.OnClosingContext(px)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

limit := make(chan struct{}, provideWorkerMax)

limitedGoProvide := func(k cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
}()

log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}
}

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
for wid := 2; ; wid++ {
log.Debug("Bitswap.ProvideWorker.Loop")

select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
select {
case <-px.Closing():
return
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
}
}
}

func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down
Loading
Loading