Skip to content

Commit

Permalink
fix: exclude bootnode from protocol requests (#4909)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Nov 21, 2024
1 parent 23445a1 commit 440ff5c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 11 deletions.
4 changes: 3 additions & 1 deletion pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,9 @@ func TestEvictSOC(t *testing.T) {
if err != nil {
t.Fatal(err)
}
checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0
if has, _ := ts.ChunkStore().Has(context.Background(), chunks[0].Address()); !has {
t.Fatal("same address chunk should still persist, eg refCnt > 0")
}

evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/topology/kademlia/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (
// operation whose execution modifies a specific metrics.
type RecordOp func(*Counters)

// Bootnode will mark the peer metric as bootnode based on the bool arg.
func IsBootnode(b bool) RecordOp {
return func(cs *Counters) {
cs.Lock()
defer cs.Unlock()
cs.isBootnode = b
}
}

// PeerLogIn will first update the current last seen to the give time t and as
// the second it'll set the direction of the session connection to the given
// value. The force flag will force the peer re-login if he's already logged in.
Expand Down Expand Up @@ -155,6 +164,7 @@ type Counters struct {
// Bookkeeping.
isLoggedIn bool
peerAddress swarm.Address
isBootnode bool

// Counters.
lastSeenTimestamp int64
Expand Down Expand Up @@ -308,6 +318,13 @@ func (c *Collector) IsUnreachable(addr swarm.Address) bool {
// ExcludeOp is a function type used to filter peers on certain fields.
type ExcludeOp func(*Counters) bool

// IsBootnode is used to filter bootnode peers.
func Bootnode() ExcludeOp {
return func(cs *Counters) bool {
return cs.isBootnode
}
}

// Reachable is used to filter reachable or unreachable peers based on r.
func Reachability(filterReachable bool) ExcludeOp {
return func(cs *Counters) bool {
Expand Down
36 changes: 36 additions & 0 deletions pkg/topology/kademlia/internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,39 @@ func TestPeerMetricsCollector(t *testing.T) {
t.Fatalf("unexpected snapshot difference:\n%s", diff)
}
}

func TestExclude(t *testing.T) {
t.Parallel()

db, err := shed.NewDB("", nil)
if err != nil {
t.Fatal(err)
}
testutil.CleanupCloser(t, db)

mc, err := metrics.NewCollector(db)
if err != nil {
t.Fatal(err)
}

var addr = swarm.RandAddress(t)

// record unhealthy, unreachable, bootnode
mc.Record(addr, metrics.PeerHealth(false), metrics.IsBootnode(true), metrics.PeerReachability(p2p.ReachabilityStatusPrivate))

if have, want := mc.Exclude(addr), false; have != want {
t.Fatal("should not exclude any")
}

if have, want := mc.Exclude(addr, metrics.Bootnode()), true; have != want {
t.Fatal("should exclude bootnodes")
}

if have, want := mc.Exclude(addr, metrics.Reachability(false)), true; have != want {
t.Fatal("should exclude unreachble")
}

if have, want := mc.Exclude(addr, metrics.Health(false)), true; have != want {
t.Fatal("should exclude unhealthy")
}
}
19 changes: 10 additions & 9 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) {
}

k.metrics.TotalOutboundConnections.Inc()
k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound))
k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound), im.IsBootnode(true))
loggerV1.Debug("connected to bootnode", "bootnode_address", addr)
connected++

Expand Down Expand Up @@ -1061,8 +1061,7 @@ outer:
addrs = append(addrs, connectedPeer)

if !fullnode {
// we continue here so we dont gossip
// about lightnodes to others.
// dont gossip about lightnodes to others.
continue
}
// if kademlia is closing, dont enqueue anymore broadcast requests
Expand Down Expand Up @@ -1314,9 +1313,9 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology.

// EachConnectedPeer implements topology.PeerIterator interface.
func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) error {
filters := excludeOps(filter)
filters := excludeFromIterator(filter)
return k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) {
if k.opt.ExcludeFunc(filters...)(addr) {
return false, false, nil
}
return f(addr, po)
Expand All @@ -1325,9 +1324,9 @@ func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select)

// EachConnectedPeerRev implements topology.PeerIterator interface.
func (k *Kad) EachConnectedPeerRev(f topology.EachPeerFunc, filter topology.Select) error {
filters := excludeOps(filter)
filters := excludeFromIterator(filter)
return k.connectedPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
if len(filters) > 0 && k.opt.ExcludeFunc(filters...)(addr) {
if k.opt.ExcludeFunc(filters...)(addr) {
return false, false, nil
}
return f(addr, po)
Expand Down Expand Up @@ -1391,9 +1390,11 @@ func (k *Kad) SubscribeTopologyChange() (c <-chan struct{}, unsubscribe func())
return channel, unsubscribe
}

func excludeOps(filter topology.Select) []im.ExcludeOp {
func excludeFromIterator(filter topology.Select) []im.ExcludeOp {

ops := make([]im.ExcludeOp, 0, 2)
ops := make([]im.ExcludeOp, 0, 3)

ops = append(ops, im.Bootnode())

if filter.Reachable {
ops = append(ops, im.Reachability(false))
Expand Down
19 changes: 18 additions & 1 deletion pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,13 +1200,17 @@ func TestStart(t *testing.T) {
t.Parallel()

var bootnodes []ma.Multiaddr
var bootnodesOverlays []swarm.Address
for i := 0; i < 10; i++ {
multiaddr, err := ma.NewMultiaddr(underlayBase + swarm.RandAddress(t).String())
overlay := swarm.RandAddress(t)

multiaddr, err := ma.NewMultiaddr(underlayBase + overlay.String())
if err != nil {
t.Fatal(err)
}

bootnodes = append(bootnodes, multiaddr)
bootnodesOverlays = append(bootnodesOverlays, overlay)
}

t.Run("non-empty addressbook", func(t *testing.T) {
Expand Down Expand Up @@ -1253,6 +1257,19 @@ func TestStart(t *testing.T) {

waitCounter(t, &conns, 3)
waitCounter(t, &failedConns, 0)

err := kad.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) {
for _, b := range bootnodesOverlays {
if b.Equal(addr) {
return false, false, errors.New("did not expect bootnode address from the iterator")
}
}
return false, false, nil

}, topology.Select{})
if err != nil {
t.Fatal(err)
}
})
}

Expand Down

0 comments on commit 440ff5c

Please sign in to comment.