Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log nil addr in AddrInfo #737

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStat
}

// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
func (dhtx *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
reqCtx, cancel := context.WithCancel(ctx)
outCh := make(chan peer.AddrInfo)

Expand All @@ -178,8 +178,8 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)
subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx)
}

wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count)
lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count)
wanCh := dhtx.WAN.FindProvidersAsync(subCtx, key, count)
lanCh := dhtx.LAN.FindProvidersAsync(subCtx, key, count)
zeroCount := (count == 0)
go func() {
defer cancel()
Expand All @@ -199,11 +199,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)
}
continue
case pi, ok = <-wanCh:
dht.DebugAddrInfo("BUG", pi)
if !ok {
wanCh = nil
continue
}
case pi, ok = <-lanCh:
dht.DebugAddrInfo("BUG", pi)
if !ok {
lanCh = nil
continue
Expand Down
16 changes: 9 additions & 7 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in
return peerOut
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
func (dhtx *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)

findAll := count == 0
Expand All @@ -1232,11 +1232,12 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ps = peer.NewLimitedSet(count)
}

provs := dht.ProviderManager.GetProviders(ctx, key)
provs := dhtx.ProviderManager.GetProviders(ctx, key)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.h.Peerstore().PeerInfo(p)
pi := dhtx.h.Peerstore().PeerInfo(p)
kaddht.DebugAddrInfo("BUG", pi)
select {
case peerOut <- pi:
case <-ctx.Done():
Expand All @@ -1251,7 +1252,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
}
}

peers, err := dht.GetClosestPeers(ctx, string(key))
peers, err := dhtx.GetClosestPeers(ctx, string(key))
if err != nil {
return
}
Expand All @@ -1266,7 +1267,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ID: p,
})

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
provs, closest, err := dhtx.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
return err
}
Expand All @@ -1275,8 +1276,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.

// Add unique providers from request, up to 'count'
for _, prov := range provs {
dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
dhtx.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
logger.Debugf("got provider: %s", prov)
kaddht.DebugAddrInfo("BUG", *prov)
if ps.TryAdd(prov.ID) {
logger.Debugf("using provider: %s", prov)
select {
Expand Down Expand Up @@ -1304,7 +1306,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return nil
}

dht.execOnMany(queryctx, fn, peers, false)
dhtx.execOnMany(queryctx, fn, peers, false)
}

// FindPeer searches for a peer with given ID.
Expand Down
12 changes: 12 additions & 0 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"runtime/debug"
"sync"
"time"

Expand All @@ -22,6 +23,15 @@ import (
"github.com/multiformats/go-multihash"
)

func DebugAddrInfo(msg string, ai peer.AddrInfo) {
for _, a := range ai.Addrs {
if a == nil {
logger.Errorf("(%s) AddrInfo with nil multiaddress: %s", msg, string(debug.Stack()))
return
}
}
}

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get
Expand Down Expand Up @@ -499,6 +509,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.peerstore.PeerInfo(p)
DebugAddrInfo("BUG", pi)
select {
case peerOut <- pi:
case <-ctx.Done():
Expand Down Expand Up @@ -532,6 +543,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
for _, prov := range provs {
dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
logger.Debugf("got provider: %s", prov)
DebugAddrInfo("BUG", *prov)
if ps.TryAdd(prov.ID) {
logger.Debugf("using provider: %s", prov)
select {
Expand Down