diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..86b0f260a 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -175,6 +175,23 @@ func (r *recallWantlist) ClearSentAt(c cid.Cid) { delete(r.sentAt, c) } +// Refresh moves wants from the sent list back to the pending list. +// If a want has been sent for longer than the interval, it is moved back to the pending list. +// Returns the number of wants that were refreshed. +func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int { + var refreshed int + for _, want := range r.sent.Entries() { + sentAt, ok := r.sentAt[want.Cid] + if ok && now.Sub(sentAt) >= interval { + r.pending.Add(want.Cid, want.Priority, want.WantType) + r.sent.Remove(want.Cid) + refreshed++ + } + } + + return refreshed +} + type peerConn struct { p peer.ID network MessageNetwork @@ -476,27 +493,27 @@ func (mq *MessageQueue) rebroadcastWantlist() { mq.rebroadcastIntervalLk.Unlock() // If some wants were transferred from the rebroadcast list - if mq.transferRebroadcastWants() { + if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 { // Send them out mq.sendMessage() + log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p) } } // Transfer wants from the rebroadcast lists into the pending lists. -func (mq *MessageQueue) transferRebroadcastWants() bool { +func (mq *MessageQueue) transferRebroadcastWants() int { mq.wllock.Lock() defer mq.wllock.Unlock() - // Check if there are any wants to rebroadcast - if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 { - return false - } - - // Copy sent wants into pending wants lists - mq.bcstWants.pending.Absorb(mq.bcstWants.sent) - mq.peerWants.pending.Absorb(mq.peerWants.sent) + mq.rebroadcastIntervalLk.Lock() + rebroadcastInterval := mq.rebroadcastInterval + mq.rebroadcastIntervalLk.Unlock() - return true + now := mq.clock.Now() + // Transfer sent wants into pending wants lists + transferred := mq.bcstWants.Refresh(now, rebroadcastInterval) + transferred += mq.peerWants.Refresh(now, rebroadcastInterval) + return transferred } func (mq *MessageQueue) signalWorkReady() {