From e2ffaa78381ee3ed256013b58edc7b6aef392899 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Mon, 2 Aug 2021 14:41:03 -0400 Subject: [PATCH 1/4] intercept invalid AddrInfos --- dual/dual.go | 5 +++++ fullrt/dht.go | 5 +++++ go.mod | 2 +- go.sum | 2 ++ routing.go | 5 +++++ 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dual/dual.go b/dual/dual.go index df88dcd14..e37207c4e 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -5,6 +5,7 @@ package dual import ( "context" "fmt" + "runtime" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -199,11 +200,15 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) } continue case pi, ok = <-wanCh: + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) if !ok { wanCh = nil continue } case pi, ok = <-lanCh: + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) if !ok { lanCh = nil continue diff --git a/fullrt/dht.go b/fullrt/dht.go index a6e96d4e5..d493b1b3e 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "sync" "sync/atomic" "time" @@ -1237,6 +1238,8 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { pi := dht.h.Peerstore().PeerInfo(p) + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) select { case peerOut <- pi: case <-ctx.Done(): @@ -1277,6 +1280,8 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. for _, prov := range provs { dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), *prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov) select { diff --git a/go.mod b/go.mod index 1f2120d7a..bfd6247d7 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.14.4 - github.com/libp2p/go-libp2p-core v0.8.6 + github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e github.com/libp2p/go-libp2p-kbucket v0.4.7 github.com/libp2p/go-libp2p-peerstore v0.2.8 github.com/libp2p/go-libp2p-record v0.1.3 diff --git a/go.sum b/go.sum index 3dc69881e..648908e79 100644 --- a/go.sum +++ b/go.sum @@ -423,6 +423,8 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.6 h1:3S8g006qG6Tjpj1JdRK2S+TWc2DJQKX/RG9fdLeiLSU= github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= +github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e h1:mJ6lZnwc/wBtSGfXhGCWWpDwVUKHA45WEBLAibmHaPY= +github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e/go.mod h1:XxUpbnHZbXggWyIdmL4m8GoSbQrcOv6vCAoIVEvfk08= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= diff --git a/routing.go b/routing.go index ca269e514..ec5a6461c 100644 --- a/routing.go +++ b/routing.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "runtime" "sync" "time" @@ -499,6 +500,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { pi := dht.peerstore.PeerInfo(p) + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) select { case peerOut <- pi: case <-ctx.Done(): @@ -532,6 +535,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash for _, prov := range provs { dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) + _, file, line, _ := runtime.Caller(0) + peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), *prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov) select { From ee213961b11a84c7a0784f1f6efda73fd9ed557c Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Mon, 2 Aug 2021 19:39:38 -0400 Subject: [PATCH 2/4] fix --- dual/dual.go | 13 +++++-------- fullrt/dht.go | 22 ++++++++++------------ go.mod | 2 +- routing.go | 17 ++++++++++++----- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/dual/dual.go b/dual/dual.go index e37207c4e..79832c5ee 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -5,7 +5,6 @@ package dual import ( "context" "fmt" - "runtime" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -168,7 +167,7 @@ func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStat } // FindProvidersAsync searches for peers who are able to provide a given key -func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +func (dhtx *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { reqCtx, cancel := context.WithCancel(ctx) outCh := make(chan peer.AddrInfo) @@ -179,8 +178,8 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx) } - wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count) - lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count) + wanCh := dhtx.WAN.FindProvidersAsync(subCtx, key, count) + lanCh := dhtx.LAN.FindProvidersAsync(subCtx, key, count) zeroCount := (count == 0) go func() { defer cancel() @@ -200,15 +199,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) } continue case pi, ok = <-wanCh: - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) + dht.DebugAddrInfo("BUG", pi) if !ok { wanCh = nil continue } case pi, ok = <-lanCh: - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) + dht.DebugAddrInfo("BUG", pi) if !ok { lanCh = nil continue diff --git a/fullrt/dht.go b/fullrt/dht.go index d493b1b3e..b98ac5dcc 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "math/rand" - "runtime" "sync" "sync/atomic" "time" @@ -20,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/gogo/protobuf/proto" "github.com/ipfs/go-cid" @@ -1222,7 +1222,7 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in return peerOut } -func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { +func (dhtx *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { defer close(peerOut) findAll := count == 0 @@ -1233,13 +1233,12 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. ps = peer.NewLimitedSet(count) } - provs := dht.ProviderManager.GetProviders(ctx, key) + provs := dhtx.ProviderManager.GetProviders(ctx, key) for _, p := range provs { // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { - pi := dht.h.Peerstore().PeerInfo(p) - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) + pi := dhtx.h.Peerstore().PeerInfo(p) + dht.DebugAddrInfo("BUG", pi) select { case peerOut <- pi: case <-ctx.Done(): @@ -1254,7 +1253,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. } } - peers, err := dht.GetClosestPeers(ctx, string(key)) + peers, err := dhtx.GetClosestPeers(ctx, string(key)) if err != nil { return } @@ -1269,7 +1268,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. ID: p, }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + provs, closest, err := dhtx.protoMessenger.GetProviders(ctx, p, key) if err != nil { return err } @@ -1278,10 +1277,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. // Add unique providers from request, up to 'count' for _, prov := range provs { - dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) + dhtx.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), *prov) + dht.DebugAddrInfo("BUG", *prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov) select { @@ -1309,7 +1307,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. return nil } - dht.execOnMany(queryctx, fn, peers, false) + dhtx.execOnMany(queryctx, fn, peers, false) } // FindPeer searches for a peer with given ID. diff --git a/go.mod b/go.mod index bfd6247d7..1f2120d7a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.14.4 - github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e + github.com/libp2p/go-libp2p-core v0.8.6 github.com/libp2p/go-libp2p-kbucket v0.4.7 github.com/libp2p/go-libp2p-peerstore v0.2.8 github.com/libp2p/go-libp2p-record v0.1.3 diff --git a/routing.go b/routing.go index ec5a6461c..87671f812 100644 --- a/routing.go +++ b/routing.go @@ -4,7 +4,7 @@ import ( "bytes" "context" "fmt" - "runtime" + "runtime/debug" "sync" "time" @@ -23,6 +23,15 @@ import ( "github.com/multiformats/go-multihash" ) +func DebugAddrInfo(msg string, ai peer.AddrInfo) { + for _, a := range ai.Addrs { + if a == nil { + logger.Errorf("(%s) AddrInfo with nil multiaddress: %s", msg, string(debug.Stack())) + return + } + } +} + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -500,8 +509,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { pi := dht.peerstore.PeerInfo(p) - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), pi) + DebugAddrInfo("BUG", pi) select { case peerOut <- pi: case <-ctx.Done(): @@ -535,8 +543,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash for _, prov := range provs { dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) - _, file, line, _ := runtime.Caller(0) - peer.DebugAddrInfo(fmt.Sprintf("BUG %s:%d", file, line), *prov) + DebugAddrInfo("BUG", *prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov) select { From d03928f67131d3bbcc72dd3156f6d05c1ee58861 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Mon, 2 Aug 2021 19:45:58 -0400 Subject: [PATCH 3/4] fix --- fullrt/dht.go | 2 +- go.sum | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index b98ac5dcc..b657c0ea0 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1238,7 +1238,7 @@ func (dhtx *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { pi := dhtx.h.Peerstore().PeerInfo(p) - dht.DebugAddrInfo("BUG", pi) + kaddht.DebugAddrInfo("BUG", pi) select { case peerOut <- pi: case <-ctx.Done(): diff --git a/go.sum b/go.sum index 648908e79..3dc69881e 100644 --- a/go.sum +++ b/go.sum @@ -423,8 +423,6 @@ github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.6 h1:3S8g006qG6Tjpj1JdRK2S+TWc2DJQKX/RG9fdLeiLSU= github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= -github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e h1:mJ6lZnwc/wBtSGfXhGCWWpDwVUKHA45WEBLAibmHaPY= -github.com/libp2p/go-libp2p-core v0.9.1-0.20210802182816-07c43a93ef3e/go.mod h1:XxUpbnHZbXggWyIdmL4m8GoSbQrcOv6vCAoIVEvfk08= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= From b5f242a551813382d1d3ded3268d1f489961c990 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Mon, 2 Aug 2021 19:55:56 -0400 Subject: [PATCH 4/4] fix --- fullrt/dht.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index b657c0ea0..61d31b494 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -19,7 +19,6 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" - dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/gogo/protobuf/proto" "github.com/ipfs/go-cid" @@ -1279,7 +1278,7 @@ func (dhtx *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash for _, prov := range provs { dhtx.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) logger.Debugf("got provider: %s", prov) - dht.DebugAddrInfo("BUG", *prov) + kaddht.DebugAddrInfo("BUG", *prov) if ps.TryAdd(prov.ID) { logger.Debugf("using provider: %s", prov) select {