Skip to content

Commit

Permalink
addressed review
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Mar 7, 2023
1 parent be25008 commit 5e2ef88
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 47 deletions.
83 changes: 40 additions & 43 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,13 @@ type IpfsDHT struct {

autoRefresh bool

// A function performing a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
lookupCheck func(context.Context, peer.ID) error
lookupCheckTimeout time.Duration
lookupCheckInterval time.Duration // time interval during which we don't try to query the same peer again
// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration
// time interval during which we don't try to query the same peer again
lookupCheckInterval time.Duration
// recentlyCheckedPeers contains the peers recently queried with the time at which they were queried
recentlyCheckedPeers map[peer.ID]time.Time
recentlyCheckedPeersLk sync.Mutex
peerRecentlyQueried func(peer.ID) bool

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
Expand Down Expand Up @@ -327,36 +325,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.lookupCheck = func(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}
dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout
dht.recentlyCheckedPeers = make(map[peer.ID]time.Time)
dht.peerRecentlyQueried = func(p peer.ID) bool {
dht.recentlyCheckedPeersLk.Lock()

now := time.Now()

// clean recentlyCheckedPeers
for peerid, t := range dht.recentlyCheckedPeers {
// remove peers that have been queried more than lookupCheckInterval ago
if t.Add(dht.lookupCheckInterval).Before(now) {
delete(dht.recentlyCheckedPeers, peerid)
}
}

// if p still in recentlyCheckedPeers, it has been queried less than
// lookupCheckInterval ago
_, ok := dht.recentlyCheckedPeers[p]
dht.recentlyCheckedPeersLk.Unlock()
return ok
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
Expand Down Expand Up @@ -389,6 +359,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}

// peerRecentlyQueried returns true if p has been queried less than dht.lookupCheckInterval ago
func (dht *IpfsDHT) peerRecentlyQueried(p peer.ID) bool {
dht.recentlyCheckedPeersLk.Lock()
defer dht.recentlyCheckedPeersLk.Unlock()

now := time.Now()

// clean recentlyCheckedPeers
for peerid, t := range dht.recentlyCheckedPeers {
// remove peers that have been queried more than lookupCheckInterval ago
if t.Add(dht.lookupCheckInterval).Before(now) {
delete(dht.recentlyCheckedPeers, peerid)
}
}

// if p still in recentlyCheckedPeers, it has been queried less than
// lookupCheckInterval ago
_, ok := dht.recentlyCheckedPeers[p]
return ok
}

func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
Expand Down Expand Up @@ -681,22 +684,16 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// connecting to the remote peer
if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: p}); err != nil {
logger.Debugw("failed connection to DHT peer", "peer", p, "error", err)
// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}

// add peer.ID to recently queried peers
dht.recentlyCheckedPeersLk.Lock()
dht.recentlyCheckedPeers[p] = time.Now()
dht.recentlyCheckedPeersLk.Unlock()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}
// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}
Expand Down
9 changes: 5 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"
"golang.org/x/exp/maps"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1565,7 +1566,7 @@ func TestFixLowPeers(t *testing.T) {

// remove blacklist of already contacted peers
mainD.recentlyCheckedPeersLk.Lock()
mainD.recentlyCheckedPeers = make(map[peer.ID]time.Time)
maps.Clear(mainD.recentlyCheckedPeers)
mainD.recentlyCheckedPeersLk.Unlock()

// but we will still get enough peers in the RT because of fix low Peers
Expand Down Expand Up @@ -1675,11 +1676,11 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) {

// clear connection history
dhtA.recentlyCheckedPeersLk.Lock()
dhtA.recentlyCheckedPeers = make(map[peer.ID]time.Time)
maps.Clear(dhtA.recentlyCheckedPeers)
dhtA.recentlyCheckedPeersLk.Unlock()

dhtB.recentlyCheckedPeersLk.Lock()
dhtB.recentlyCheckedPeers = make(map[peer.ID]time.Time)
maps.Clear(dhtB.recentlyCheckedPeers)
dhtB.recentlyCheckedPeersLk.Unlock()

// now assert both have each other in their RT
Expand Down Expand Up @@ -2184,7 +2185,7 @@ func TestPreconnectedNodes(t *testing.T) {

// clear d2 recent checked peers
d2.recentlyCheckedPeersLk.Lock()
d2.recentlyCheckedPeers = make(map[peer.ID]time.Time)
maps.Clear(d2.recentlyCheckedPeers)
d2.recentlyCheckedPeersLk.Unlock()

connect(t, ctx, d1, d2)
Expand Down

0 comments on commit 5e2ef88

Please sign in to comment.