diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 16f263367..020bb23f5 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -3,10 +3,12 @@ package filter import ( "context" "encoding/json" + "errors" "time" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" @@ -29,6 +31,7 @@ func (fc FilterConfig) String() string { } const filterSubLoopInterval = 5 * time.Second +const filterSubMaxErrCnt = 3 type Sub struct { ContentFilter protocol.ContentFilter @@ -43,6 +46,7 @@ type Sub struct { onlineChecker onlinechecker.OnlineChecker resubscribeInProgress bool id string + errcnt int } type subscribeParameters struct { @@ -107,13 +111,14 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { } } -func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { +func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) { defer utils.LogOnPanic() - ticker := time.NewTicker(batchInterval) + ticker := time.NewTicker(loopInterval) defer ticker.Stop() for { select { case <-ticker.C: + apiSub.errcnt = 0 //reset errorCount if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" @@ -123,9 +128,11 @@ func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { apiSub.cleanup() return case subId := <-apiSub.closing: - apiSub.resubscribeInProgress = true - //trigger resubscribe flow for subscription. - apiSub.checkAndResubscribe(subId) + if apiSub.errcnt < filterSubMaxErrCnt { + apiSub.resubscribeInProgress = true + //trigger resubscribe flow for subscription. + apiSub.checkAndResubscribe(subId) + } } } } @@ -181,6 +188,10 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) { apiSub.multiplex(subs) } +func possibleRecursiveError(err error) bool { + return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff) +} + func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { // Low-level subscribe, returns a set of SubscriptionDetails options := make([]filter.FilterSubscribeOption, 0) @@ -195,6 +206,9 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) if err != nil { + if possibleRecursiveError(err) { + apiSub.errcnt++ + } //Inform of error, so that resubscribe can be triggered if required if len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" diff --git a/waku/v2/peermanager/fastest_peer_selector.go b/waku/v2/peermanager/fastest_peer_selector.go index 7f2ce6ddb..65065d3f5 100644 --- a/waku/v2/peermanager/fastest_peer_selector.go +++ b/waku/v2/peermanager/fastest_peer_selector.go @@ -139,7 +139,7 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic } } - return "", ErrNoPeersAvailable + return "", utils.ErrNoPeersAvailable } type pingResult struct { diff --git a/waku/v2/peermanager/fastest_peer_selector_test.go b/waku/v2/peermanager/fastest_peer_selector_test.go index 6576d11c6..905f514a8 100644 --- a/waku/v2/peermanager/fastest_peer_selector_test.go +++ b/waku/v2/peermanager/fastest_peer_selector_test.go @@ -34,13 +34,13 @@ func TestRTT(t *testing.T) { h3.Close() _, err = rtt.FastestPeer(ctx, peer.IDSlice{h3.ID()}) - require.ErrorIs(t, err, ErrNoPeersAvailable) + require.ErrorIs(t, err, utils.ErrNoPeersAvailable) // H3 should never return for i := 0; i < 100; i++ { p, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()}) if err != nil { - require.ErrorIs(t, err, ErrNoPeersAvailable) + require.ErrorIs(t, err, utils.ErrNoPeersAvailable) } else { require.NotEqual(t, h3.ID(), p) } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 8e07d93c6..14cc100dd 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -98,10 +98,6 @@ const ( LowestRTT ) -// ErrNoPeersAvailable is emitted when no suitable peers are found for -// some protocol -var ErrNoPeersAvailable = errors.New("no suitable peers found") - const maxFailedAttempts = 5 const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 12dceef20..aad3b639c 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -93,7 +93,7 @@ func TestServiceSlots(t *testing.T) { defer h4.Close() _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) - require.Error(t, err, ErrNoPeersAvailable) + require.Error(t, err, utils.ErrNoPeersAvailable) // add h4 peer for protocol1 _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) @@ -138,7 +138,7 @@ func TestPeerSelection(t *testing.T) { require.Equal(t, h2.ID(), peerIDs[0]) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) - require.Error(t, ErrNoPeersAvailable, err) + require.Error(t, utils.ErrNoPeersAvailable, err) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) @@ -175,7 +175,7 @@ func TestDefaultProtocol(t *testing.T) { /////////////// //Test empty peer selection for relay protocol _, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) - require.Error(t, err, ErrNoPeersAvailable) + require.Error(t, err, utils.ErrNoPeersAvailable) /////////////// // getting peer for default protocol @@ -215,7 +215,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { pm.RemovePeer(peers[0]) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) - require.Error(t, err, ErrNoPeersAvailable) + require.Error(t, err, utils.ErrNoPeersAvailable) } func TestConnectToRelayPeers(t *testing.T) { diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 8011b7e90..2d477728c 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -59,7 +60,7 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic peerIDs, err := pm.selectServicePeer(criteria) if err == nil && len(peerIDs) == criteria.MaxPeers { return maps.Keys(peerIDs), nil - } else if !errors.Is(err, ErrNoPeersAvailable) { + } else if !errors.Is(err, utils.ErrNoPeersAvailable) { pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return nil, err @@ -101,7 +102,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) } } if len(selectedPeers) == 0 { - return nil, ErrNoPeersAvailable + return nil, utils.ErrNoPeersAvailable } return selectedPeers, nil } @@ -157,7 +158,7 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe if len(peers) == 0 { pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) } - return peers, ErrNoPeersAvailable + return peers, utils.ErrNoPeersAvailable } // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index b85a2c6f2..33415f035 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" "golang.org/x/exp/maps" ) @@ -26,7 +27,7 @@ func TestServiceSlot(t *testing.T) { slots.getPeers(protocol).remove(peerID) // _, err = slots.getPeers(protocol).getRandom(1, nil) - require.Equal(t, err, ErrNoPeersAvailable) + require.Equal(t, err, utils.ErrNoPeersAvailable) // Test with more peers peerID2 := peer.ID("peerId2") @@ -74,7 +75,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) { slots.removePeer(peerID) // _, err = slots.getPeers(protocol).getRandom(1, nil) - require.Equal(t, err, ErrNoPeersAvailable) + require.Equal(t, err, utils.ErrNoPeersAvailable) _, err = slots.getPeers(protocol1).getRandom(1, nil) - require.Equal(t, err, ErrNoPeersAvailable) + require.Equal(t, err, utils.ErrNoPeersAvailable) } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index b732fa149..b71677f79 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -1,6 +1,8 @@ package utils import ( + "errors" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) @@ -10,6 +12,10 @@ type DialError struct { PeerID peer.ID } +// ErrNoPeersAvailable is emitted when no suitable peers are found for +// some protocol +var ErrNoPeersAvailable = errors.New("no suitable peers found") + // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)