Skip to content

Commit

Permalink
bitswap(client/messageque): expose donthavetimeout config
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 24, 2024
1 parent 392493d commit d04bd72
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 93 deletions.
11 changes: 10 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

var log = logging.Logger("bitswap/client")

type DontHaveTimeoutConfig = bsmq.DontHaveTimeoutConfig

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Client)
Expand Down Expand Up @@ -71,6 +73,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithDontHaveTimeoutConfig(cfg *DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}

Check warning on line 79 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L76-L79

Added lines #L76 - L79 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -133,7 +141,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bs.dontHaveTimeoutConfig)
}

sim := bssim.New()
Expand Down Expand Up @@ -242,6 +250,7 @@ type Client struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
122 changes: 60 additions & 62 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,41 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
func defaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig {
cfg := &DontHaveTimeoutConfig{
DontHaveTimeout: 5 * time.Second,
MaxExpectedWantProcessTime: 2 * time.Second,
PingLatencyMultiplier: 3,
MessageLatencyAlpha: 0.5,
MessageLatencyMultiplier: 2,
}

cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime
return cfg
}

type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second

// maxExpectedWantProcessTime is the maximum amount of time we expect a
DontHaveTimeout time.Duration
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second

// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

// pingLatencyMultiplier is multiplied by the average ping time to
MaxExpectedWantProcessTime time.Duration
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5

PingLatencyMultiplier int
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int
}

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
Expand All @@ -61,16 +69,12 @@ type pendingWant struct {
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid, time.Duration)
config *DontHaveTimeoutConfig

// All variables below here must be protected by the lock
lk sync.RWMutex
Expand All @@ -92,39 +96,33 @@ type dontHaveTimeoutMgr struct {

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig, clock clock.Clock) *dontHaveTimeoutMgr {
if cfg == nil {
cfg = defaultDontHaveTimeoutConfig()
}
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, cfg, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
onDontHaveTimeout func([]cid.Cid, time.Duration),
cfg *DontHaveTimeoutConfig,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: cfg.DontHaveTimeout,
messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha},
onDontHaveTimeout: onDontHaveTimeout,
config: cfg,
timeoutsTriggered: timeoutsTriggered,
}

return mqp
Expand Down Expand Up @@ -189,7 +187,7 @@ func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout)
defer cancel()

// Ping the peer
Expand Down Expand Up @@ -252,7 +250,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
go dhtm.fireTimeout(expired, dhtm.timeout)
}

if len(dhtm.wantQueue) == 0 {
Expand Down Expand Up @@ -340,14 +338,14 @@ func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}

// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) {
// Make sure the timeout manager has not been shut down
if dhtm.ctx.Err() != nil {
return
}

// Fire the timeout
dhtm.onDontHaveTimeout(pending)
dhtm.onDontHaveTimeout(pending, timeout)

// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
Expand All @@ -360,18 +358,18 @@ func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Dur
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout

Check warning on line 363 in bitswap/client/internal/messagequeue/donthavetimeoutmgr.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/donthavetimeoutmgr.go#L363

Added line #L363 was not covered by tests
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier)
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}
Expand Down
59 changes: 40 additions & 19 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type timeoutRecorder struct {
lk sync.Mutex
}

func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) {
func (tr *timeoutRecorder) onTimeout(tks []cid.Cid, _ time.Duration) {
tr.lk.Lock()
defer tr.lk.Unlock()

Expand Down Expand Up @@ -84,8 +84,10 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged}
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -139,8 +141,10 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged}
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -176,8 +180,10 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -228,8 +234,10 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -262,8 +270,11 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MessageLatencyMultiplier = msgLatencyMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -309,8 +320,10 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
testMaxTimeout := time.Millisecond * 10
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.MessageLatencyMultiplier = msgLatencyMultiplier
cfg.MaxTimeout = testMaxTimeout
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -345,8 +358,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged, err: errors.New("ping error")}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.DontHaveTimeout = defaultTimeout
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -385,8 +401,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.DontHaveTimeout = defaultTimeout
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -424,8 +443,10 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down
9 changes: 5 additions & 4 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,14 @@ type DontHaveTimeoutManager interface {
}

// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
onTimeout := func(ks []cid.Cid) {
log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout,
dontHaveTimeoutConfig *DontHaveTimeoutConfig) *MessageQueue {
onTimeout := func(ks []cid.Cid, duration time.Duration) {
log.Infow("Bitswap: timeout waiting for blocks", "timeout", duration.String(), "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
clock := clock.New()
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock)
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, dontHaveTimeoutConfig, clock)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil)
}

Expand Down
Loading

0 comments on commit d04bd72

Please sign in to comment.