Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: simple backoff strategy after 3 subscribe failures #1238

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package filter
import (
"context"
"encoding/json"
"errors"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
Expand All @@ -29,6 +31,7 @@ func (fc FilterConfig) String() string {
}

const filterSubLoopInterval = 5 * time.Second
const filterSubMaxErrCnt = 3

type Sub struct {
ContentFilter protocol.ContentFilter
Expand All @@ -43,6 +46,7 @@ type Sub struct {
onlineChecker onlinechecker.OnlineChecker
resubscribeInProgress bool
id string
errcnt int
}

type subscribeParameters struct {
Expand Down Expand Up @@ -107,13 +111,14 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
}
}

func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
defer utils.LogOnPanic()
ticker := time.NewTicker(batchInterval)
ticker := time.NewTicker(loopInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
apiSub.errcnt = 0 //reset errorCount
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
Expand All @@ -123,9 +128,11 @@ func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
apiSub.cleanup()
return
case subId := <-apiSub.closing:
apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
if apiSub.errcnt < filterSubMaxErrCnt {
apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
}
}
}
}
Expand Down Expand Up @@ -181,6 +188,10 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
apiSub.multiplex(subs)
}

func possibleRecursiveError(err error) bool {
return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff)
}

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
Expand All @@ -195,6 +206,9 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)

if err != nil {
if possibleRecursiveError(err) {
apiSub.errcnt++
}
//Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/peermanager/fastest_peer_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic
}
}

return "", ErrNoPeersAvailable
return "", utils.ErrNoPeersAvailable
}

type pingResult struct {
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/peermanager/fastest_peer_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func TestRTT(t *testing.T) {
h3.Close()

_, err = rtt.FastestPeer(ctx, peer.IDSlice{h3.ID()})
require.ErrorIs(t, err, ErrNoPeersAvailable)
require.ErrorIs(t, err, utils.ErrNoPeersAvailable)

// H3 should never return
for i := 0; i < 100; i++ {
p, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()})
if err != nil {
require.ErrorIs(t, err, ErrNoPeersAvailable)
require.ErrorIs(t, err, utils.ErrNoPeersAvailable)
} else {
require.NotEqual(t, h3.ID(), p)
}
Expand Down
4 changes: 0 additions & 4 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ const (
LowestRTT
)

// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")

const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
Expand Down
8 changes: 4 additions & 4 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestServiceSlots(t *testing.T) {
defer h4.Close()

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
require.Error(t, err, ErrNoPeersAvailable)
require.Error(t, err, utils.ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestPeerSelection(t *testing.T) {
require.Equal(t, h2.ID(), peerIDs[0])

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err)
require.Error(t, utils.ErrNoPeersAvailable, err)

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestDefaultProtocol(t *testing.T) {
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.Error(t, err, ErrNoPeersAvailable)
require.Error(t, err, utils.ErrNoPeersAvailable)

///////////////
// getting peer for default protocol
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {

pm.RemovePeer(peers[0])
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.Error(t, err, ErrNoPeersAvailable)
require.Error(t, err, utils.ErrNoPeersAvailable)
}

func TestConnectToRelayPeers(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions waku/v2/peermanager/peer_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil
} else if !errors.Is(err, ErrNoPeersAvailable) {
} else if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -101,7 +102,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error)
}
}
if len(selectedPeers) == 0 {
return nil, ErrNoPeersAvailable
return nil, utils.ErrNoPeersAvailable
}
return selectedPeers, nil
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe
if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
return peers, ErrNoPeersAvailable
return peers, utils.ErrNoPeersAvailable
}

// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
Expand Down
7 changes: 4 additions & 3 deletions waku/v2/peermanager/service_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"golang.org/x/exp/maps"
)

Expand All @@ -26,7 +27,7 @@ func TestServiceSlot(t *testing.T) {
slots.getPeers(protocol).remove(peerID)
//
_, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
require.Equal(t, err, utils.ErrNoPeersAvailable)

// Test with more peers
peerID2 := peer.ID("peerId2")
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
slots.removePeer(peerID)
//
_, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
require.Equal(t, err, utils.ErrNoPeersAvailable)
_, err = slots.getPeers(protocol1).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable)
require.Equal(t, err, utils.ErrNoPeersAvailable)
}
6 changes: 6 additions & 0 deletions waku/v2/utils/peer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import (
"errors"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
Expand All @@ -10,6 +12,10 @@ type DialError struct {
PeerID peer.ID
}

// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")

chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
// GetPeerID is used to extract the peerID from a multiaddress
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
Expand Down
Loading