Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc(share/p2p): reduce frequency of discovery retries #3561

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
10 changes: 2 additions & 8 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,11 @@ const (
// findPeersTimeout limits the FindPeers operation in time
findPeersTimeout = time.Minute

// retryTimeout defines time interval between discovery and advertise attempts.
retryTimeout = time.Second

// logInterval defines the time interval at which a warning message will be logged
// if the desired number of nodes is not detected.
logInterval = 5 * time.Minute
)

// discoveryRetryTimeout defines time interval between discovery attempts, needed for tests
var discoveryRetryTimeout = retryTimeout

// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
Expand Down Expand Up @@ -181,7 +175,7 @@ func (d *Discovery) Advertise(ctx context.Context) {

// we don't want retry indefinitely in busy loop
// internal discovery mechanism may need some time before attempts
errTimer := time.NewTimer(retryTimeout)
errTimer := time.NewTimer(d.params.AdvertiseRetryTimeout)
select {
case <-errTimer.C:
errTimer.Stop()
Expand Down Expand Up @@ -212,7 +206,7 @@ func (d *Discovery) Advertise(ctx context.Context) {
// It initiates peer discovery upon request and restarts the process until the soft limit is
// reached.
func (d *Discovery) discoveryLoop(ctx context.Context) {
t := time.NewTicker(discoveryRetryTimeout)
t := time.NewTicker(d.params.DiscoveryRetryTimeout)
defer t.Stop()

warnTicker := time.NewTicker(logInterval)
Expand Down
6 changes: 2 additions & 4 deletions share/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const (
func TestDiscovery(t *testing.T) {
const nodes = 10 // higher number brings higher coverage

discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
t.Cleanup(cancel)

Expand All @@ -43,7 +41,7 @@ func TestDiscovery(t *testing.T) {
}

host, routingDisc := tn.peer()
params := DefaultParameters()
params := TestParameters()
params.PeersLimit = nodes

// start discovery listener service for peerA
Expand Down Expand Up @@ -103,7 +101,7 @@ func TestDiscoveryTagged(t *testing.T) {
// sub will discover both peers, but on different tags
sub, routingDisc := tn.peer()

params := DefaultParameters()
params := TestParameters()

// create 2 discovery services for sub, each with a different tag
done1 := make(chan struct{})
Expand Down
31 changes: 29 additions & 2 deletions share/p2p/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type Parameters struct {
// AdvertiseInterval is a interval between advertising sessions.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration

// AdvertiseRetryTimeout defines time interval between advertise attempts.
AdvertiseRetryTimeout time.Duration

// DiscoveryRetryTimeout defines time interval between discovery attempts
// this is set independently for tests in discover_test.go
DiscoveryRetryTimeout time.Duration
}

// options is the set of options that can be configured for the Discovery module
Expand All @@ -33,13 +40,33 @@ type Option func(*options)
// for the Discovery module
func DefaultParameters() *Parameters {
return &Parameters{
PeersLimit: 5,
AdvertiseInterval: time.Hour,
PeersLimit: 5,
AdvertiseInterval: time.Hour,
AdvertiseRetryTimeout: time.Second,
DiscoveryRetryTimeout: time.Second * 60,
}
}

// TestParameters returns the default Parameters' configuration values
// for the Discovery module, with some changes for configuration
// during tests
func TestParameters() *Parameters {
p := DefaultParameters()
p.AdvertiseInterval = time.Second * 1
p.DiscoveryRetryTimeout = time.Millisecond * 50
return p
}

// Validate validates the values in Parameters
func (p *Parameters) Validate() error {
if p.AdvertiseRetryTimeout <= 0 {
return fmt.Errorf("discovery: advertise retry timeout cannot be zero or negative")
}

if p.DiscoveryRetryTimeout <= 0 {
return fmt.Errorf("discovery: discovery retry timeout cannot be zero or negative")
}
Comment on lines +62 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this config breaking? If so, let's mark PR as such.


if p.PeersLimit <= 0 {
return fmt.Errorf("discovery: peers limit cannot be zero or negative")
}
Expand Down
78 changes: 78 additions & 0 deletions share/p2p/discovery/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package discovery

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestValidate(t *testing.T) {
tests := []struct {
name string
params Parameters
wantErr bool
}{
{
name: "valid parameters",
params: Parameters{
PeersLimit: 5,
AdvertiseInterval: time.Hour,
AdvertiseRetryTimeout: time.Second,
DiscoveryRetryTimeout: time.Minute,
},
wantErr: false,
},
{
name: "negative PeersLimit",
params: Parameters{
PeersLimit: 0,
AdvertiseInterval: time.Hour,
AdvertiseRetryTimeout: time.Second,
DiscoveryRetryTimeout: time.Minute,
},
wantErr: true,
},
{
name: "negative AdvertiseInterval",
params: Parameters{
PeersLimit: 5,
AdvertiseInterval: -time.Hour,
AdvertiseRetryTimeout: time.Second,
DiscoveryRetryTimeout: time.Minute,
},
wantErr: true,
},
{
name: "negative AdvertiseRetryTimeout",
params: Parameters{
PeersLimit: 5,
AdvertiseInterval: time.Hour,
AdvertiseRetryTimeout: -time.Second,
DiscoveryRetryTimeout: time.Minute,
},
wantErr: true,
},
{
name: "negative DiscoveryRetryTimeout",
params: Parameters{
PeersLimit: 5,
AdvertiseInterval: time.Hour,
AdvertiseRetryTimeout: time.Second,
DiscoveryRetryTimeout: -time.Minute,
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.params.Validate()
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
29 changes: 17 additions & 12 deletions share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

const (
defaultTimeout = time.Second * 5
)

func TestManager(t *testing.T) {
t.Run("Validate pool by headerSub", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand All @@ -49,7 +53,7 @@ func TestManager(t *testing.T) {
})

t.Run("Validate pool by shrex.Getter", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand All @@ -72,7 +76,7 @@ func TestManager(t *testing.T) {
})

t.Run("validator", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -108,7 +112,7 @@ func TestManager(t *testing.T) {
})

t.Run("cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -153,7 +157,7 @@ func TestManager(t *testing.T) {
})

t.Run("no peers from shrex.Sub, get from discovery", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand All @@ -176,7 +180,7 @@ func TestManager(t *testing.T) {
})

t.Run("no peers from shrex.Sub and from discovery. Wait", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// create headerSub mock
Expand Down Expand Up @@ -218,7 +222,7 @@ func TestManager(t *testing.T) {
})

t.Run("shrexSub sends a message lower than first headerSub header height, headerSub first", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -251,7 +255,7 @@ func TestManager(t *testing.T) {
})

t.Run("shrexSub sends a message lower than first headerSub header height, shrexSub first", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -289,7 +293,7 @@ func TestManager(t *testing.T) {
})

t.Run("pools store window", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

h := testHeader()
Expand Down Expand Up @@ -322,7 +326,7 @@ func TestIntegration(t *testing.T) {
t.Run("get peer from shrexsub", func(t *testing.T) {
nw, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

bnPubSub, err := shrexsub.NewPubSub(ctx, nw.Hosts()[0], "test")
Expand Down Expand Up @@ -365,7 +369,7 @@ func TestIntegration(t *testing.T) {
fullNodesTag := "fullNodes"
nw, err := mocknet.FullMeshConnected(3)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
t.Cleanup(cancel)

// set up bootstrapper
Expand Down Expand Up @@ -393,6 +397,7 @@ func TestIntegration(t *testing.T) {

params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.DiscoveryRetryTimeout = time.Millisecond * 100

bnDisc, err := discovery.NewDiscovery(
params,
Expand Down Expand Up @@ -426,7 +431,7 @@ func TestIntegration(t *testing.T) {
}

// set up discovery for full node with hook to peer manager and check discovered peer
params = discovery.DefaultParameters()
params = discovery.TestParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10

Expand Down
Loading