diff --git a/dht.go b/dht.go index 856de2a6f..cd72fb2d5 100644 --- a/dht.go +++ b/dht.go @@ -120,15 +120,13 @@ type IpfsDHT struct { autoRefresh bool - // A function performing a lookup request to a remote peer.ID, verifying that it is able to - // answer it correctly - lookupCheck func(context.Context, peer.ID) error - lookupCheckTimeout time.Duration - lookupCheckInterval time.Duration // time interval during which we don't try to query the same peer again + // timeout for the lookupCheck operation + lookupCheckTimeout time.Duration + // time interval during which we don't try to query the same peer again + lookupCheckInterval time.Duration // recentlyCheckedPeers contains the peers recently queried with the time at which they were queried recentlyCheckedPeers map[peer.ID]time.Time recentlyCheckedPeersLk sync.Mutex - peerRecentlyQueried func(peer.ID) bool // A function returning a set of bootstrap peers to fallback on if all other attempts to fix // the routing table fail (or, e.g., this is the first time this node is @@ -327,36 +325,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err dht.routingTable = rt dht.bootstrapPeers = cfg.BootstrapPeers - dht.lookupCheck = func(ctx context.Context, p peer.ID) error { - // lookup request to p requesting for its own peer.ID - peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) - // p should return at least its own peerid - if err == nil && len(peerids) == 0 { - return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids)) - } - return err - } dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout dht.recentlyCheckedPeers = make(map[peer.ID]time.Time) - dht.peerRecentlyQueried = func(p peer.ID) bool { - dht.recentlyCheckedPeersLk.Lock() - - now := time.Now() - - // clean recentlyCheckedPeers - for peerid, t := range dht.recentlyCheckedPeers { - // remove peers that have been queried more than lookupCheckInterval ago - if t.Add(dht.lookupCheckInterval).Before(now) { - delete(dht.recentlyCheckedPeers, peerid) - } - } - - // if p still in recentlyCheckedPeers, it has been queried less than - // lookupCheckInterval ago - _, ok := dht.recentlyCheckedPeers[p] - dht.recentlyCheckedPeersLk.Unlock() - return ok - } // rt refresh manager rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold) @@ -389,6 +359,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err return dht, nil } +// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to +// answer it correctly +func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error { + // lookup request to p requesting for its own peer.ID + peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) + // p should return at least its own peerid + if err == nil && len(peerids) == 0 { + return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids)) + } + return err +} + +// peerRecentlyQueried returns true if p has been queried less than dht.lookupCheckInterval ago +func (dht *IpfsDHT) peerRecentlyQueried(p peer.ID) bool { + dht.recentlyCheckedPeersLk.Lock() + defer dht.recentlyCheckedPeersLk.Unlock() + + now := time.Now() + + // clean recentlyCheckedPeers + for peerid, t := range dht.recentlyCheckedPeers { + // remove peers that have been queried more than lookupCheckInterval ago + if t.Add(dht.lookupCheckInterval).Before(now) { + delete(dht.recentlyCheckedPeers, peerid) + } + } + + // if p still in recentlyCheckedPeers, it has been queried less than + // lookupCheckInterval ago + _, ok := dht.recentlyCheckedPeers[p] + return ok +} + func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) { keyGenFnc := func(cpl uint) (string, error) { p, err := dht.routingTable.GenRandPeerID(cpl) @@ -681,22 +684,16 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout) defer cancel() - // connecting to the remote peer - if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: p}); err != nil { - logger.Debugw("failed connection to DHT peer", "peer", p, "error", err) + // performing a FIND_NODE query + if err := dht.lookupCheck(livelinessCtx, p); err != nil { + logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err) return } - // add peer.ID to recently queried peers dht.recentlyCheckedPeersLk.Lock() dht.recentlyCheckedPeers[p] = time.Now() dht.recentlyCheckedPeersLk.Unlock() - // performing a FIND_NODE query - if err := dht.lookupCheck(livelinessCtx, p); err != nil { - logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err) - return - } // if the FIND_NODE succeeded, the peer is considered as valid dht.validPeerFound(ctx, p) } diff --git a/dht_test.go b/dht_test.go index d29d3a3d7..f8590c4f4 100644 --- a/dht_test.go +++ b/dht_test.go @@ -23,6 +23,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" "github.com/multiformats/go-multistream" + "golang.org/x/exp/maps" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1565,7 +1566,7 @@ func TestFixLowPeers(t *testing.T) { // remove blacklist of already contacted peers mainD.recentlyCheckedPeersLk.Lock() - mainD.recentlyCheckedPeers = make(map[peer.ID]time.Time) + maps.Clear(mainD.recentlyCheckedPeers) mainD.recentlyCheckedPeersLk.Unlock() // but we will still get enough peers in the RT because of fix low Peers @@ -1675,11 +1676,11 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) { // clear connection history dhtA.recentlyCheckedPeersLk.Lock() - dhtA.recentlyCheckedPeers = make(map[peer.ID]time.Time) + maps.Clear(dhtA.recentlyCheckedPeers) dhtA.recentlyCheckedPeersLk.Unlock() dhtB.recentlyCheckedPeersLk.Lock() - dhtB.recentlyCheckedPeers = make(map[peer.ID]time.Time) + maps.Clear(dhtB.recentlyCheckedPeers) dhtB.recentlyCheckedPeersLk.Unlock() // now assert both have each other in their RT @@ -2184,7 +2185,7 @@ func TestPreconnectedNodes(t *testing.T) { // clear d2 recent checked peers d2.recentlyCheckedPeersLk.Lock() - d2.recentlyCheckedPeers = make(map[peer.ID]time.Time) + maps.Clear(d2.recentlyCheckedPeers) d2.recentlyCheckedPeersLk.Unlock() connect(t, ctx, d1, d2)