Skip to content

Commit

Permalink
bitswap(client/messageque): expose donthavetimeout config
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 24, 2024
1 parent 392493d commit c9ca8a0
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 91 deletions.
9 changes: 9 additions & 0 deletions .idea/boxo.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

var log = logging.Logger("bitswap/client")

type DontHaveTimeoutConfig = bsmq.DontHaveTimeoutConfig

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Client)
Expand Down Expand Up @@ -71,6 +73,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithDontHaveTimeoutConfig(cfg *DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}

Check warning on line 79 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L76-L79

Added lines #L76 - L79 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -133,7 +141,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bs.dontHaveTimeoutConfig)
}

sim := bssim.New()
Expand Down Expand Up @@ -242,6 +250,7 @@ type Client struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
118 changes: 58 additions & 60 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,41 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
func defaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig {
cfg := &DontHaveTimeoutConfig{
DontHaveTimeout: 5 * time.Second,
MaxExpectedWantProcessTime: 2 * time.Second,
PingLatencyMultiplier: 3,
MessageLatencyAlpha: 0.5,
MessageLatencyMultiplier: 2,
}

cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime
return cfg
}

type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second

// maxExpectedWantProcessTime is the maximum amount of time we expect a
DontHaveTimeout time.Duration
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second

// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

// pingLatencyMultiplier is multiplied by the average ping time to
MaxExpectedWantProcessTime time.Duration
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5

PingLatencyMultiplier int
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int
}

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
Expand All @@ -61,16 +69,12 @@ type pendingWant struct {
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid, time.Duration)
config *DontHaveTimeoutConfig

// All variables below here must be protected by the lock
lk sync.RWMutex
Expand All @@ -92,39 +96,33 @@ type dontHaveTimeoutMgr struct {

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig, clock clock.Clock) *dontHaveTimeoutMgr {
if cfg == nil {
cfg = defaultDontHaveTimeoutConfig()
}
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, cfg, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
onDontHaveTimeout func([]cid.Cid, time.Duration),
cfg *DontHaveTimeoutConfig,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: cfg.DontHaveTimeout,
messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha},
onDontHaveTimeout: onDontHaveTimeout,
config: cfg,
timeoutsTriggered: timeoutsTriggered,
}

return mqp
Expand Down Expand Up @@ -189,7 +187,7 @@ func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout)
defer cancel()

// Ping the peer
Expand Down Expand Up @@ -347,7 +345,7 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
}

// Fire the timeout
dhtm.onDontHaveTimeout(pending)
dhtm.onDontHaveTimeout(pending, dhtm.timeout)

// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
Expand All @@ -360,18 +358,18 @@ func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Dur
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout

Check warning on line 363 in bitswap/client/internal/messagequeue/donthavetimeoutmgr.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/donthavetimeoutmgr.go#L363

Added line #L363 was not covered by tests
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier)
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}
Expand Down
Loading

0 comments on commit c9ca8a0

Please sign in to comment.