Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] Limit P2P attempts and restart on specific events #2657

Merged
merged 14 commits into from
Oct 8, 2024
58 changes: 33 additions & 25 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const (
connPriorityRelay ConnPriority = 1
connPriorityICETurn ConnPriority = 1
connPriorityICEP2P ConnPriority = 2

reconnectMaxElapsedTime = 30 * time.Minute
candidatesMonitorPeriod = 5 * time.Minute
candidatedGatheringTimeout = 5 * time.Second
signalerMonitorPeriod = 5 * time.Second
)

type WgConfig struct {
Expand Down Expand Up @@ -82,6 +87,7 @@ type Conn struct {
wgProxyICE wgproxy.Proxy
wgProxyRelay wgproxy.Proxy
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
relayManager *relayClient.Manager
allowedIPsIP string
handshaker *Handshaker
Expand All @@ -107,6 +113,10 @@ type Conn struct {
// for reconnection operations
iCEDisconnected chan bool
relayDisconnected chan bool
reconnectCh chan struct{}

currentCandidates []ice.Candidate
candidatesMu sync.Mutex
}

// NewConn creates a new not opened Conn to the remote peer.
Expand All @@ -122,19 +132,22 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
connLog := log.WithField("peer", config.Key)

var conn = &Conn{
log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
wgProxyFactory: wgProxyFactory,
signaler: signaler,
relayManager: relayManager,
allowedIPsIP: allowedIPsIP.String(),
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
wgProxyFactory: wgProxyFactory,
signaler: signaler,
iFaceDiscover: iFaceDiscover,
relayManager: relayManager,
allowedIPsIP: allowedIPsIP.String(),
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),

iCEDisconnected: make(chan bool, 1),
relayDisconnected: make(chan bool, 1),
reconnectCh: make(chan struct{}, 1),
}

rFns := WorkerRelayCallbacks{
Expand Down Expand Up @@ -308,12 +321,16 @@ func (conn *Conn) reconnectLoopWithRetry() {
// With it, we can decrease to send necessary offer
select {
case <-conn.ctx.Done():
return
case <-time.After(3 * time.Second):
}

go conn.monitorReconnectEvents()

ticker := conn.prepareExponentTicker()
defer ticker.Stop()
time.Sleep(1 * time.Second)

for {
select {
case t := <-ticker.C:
Expand Down Expand Up @@ -341,20 +358,11 @@ func (conn *Conn) reconnectLoopWithRetry() {
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
case changed := <-conn.relayDisconnected:
if !changed {
continue
}
conn.log.Debugf("Relay state changed, reset reconnect timer")
ticker.Stop()
ticker = conn.prepareExponentTicker()
case changed := <-conn.iCEDisconnected:
if !changed {
continue
}
conn.log.Debugf("ICE state changed, reset reconnect timer")

case <-conn.reconnectCh:
ticker.Stop()
ticker = conn.prepareExponentTicker()

case <-conn.ctx.Done():
conn.log.Debugf("context is done, stop reconnect loop")
return
Expand All @@ -365,10 +373,10 @@ func (conn *Conn) reconnectLoopWithRetry() {
func (conn *Conn) prepareExponentTicker() *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.01,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: conn.config.Timeout,
MaxElapsedTime: 0,
MaxElapsedTime: reconnectMaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, conn.ctx)
Expand Down
181 changes: 181 additions & 0 deletions client/internal/peer/conn_monitor.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make no sense to move this file into a separate structure? I do not see any dependency from the conn that is not exchangeable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sonar will complain :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That the file is too large

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not follow you. If you move everything from conn_monitor.go to monitor.go and in it create a

type ConnMonitor struct {
}

Then we can get a better separated code structure that is better testable by unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved it to a separate struct. But not its own package yet because that requires more refactoring on the other components

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nice! I will review it again.

Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package peer

import (
"context"
"fmt"
"time"

"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
)

func (conn *Conn) monitorReconnectEvents() {
signalerReady := make(chan struct{}, 1)
go conn.monitorSignalerReady(signalerReady)

localCandidatesChanged := make(chan struct{}, 1)
go conn.monitorLocalCandidatesChanged(localCandidatesChanged)

for {
select {
case changed := <-conn.relayDisconnected:
if !changed {
continue
}

conn.log.Debugf("Relay state changed, triggering reconnect")
conn.triggerReconnect()

case changed := <-conn.iCEDisconnected:
if !changed {
continue
}

conn.log.Debugf("ICE state changed, triggering reconnect")
conn.triggerReconnect()

case <-signalerReady:
conn.log.Debugf("Signaler became ready, triggering reconnect")
conn.triggerReconnect()

case <-localCandidatesChanged:
conn.log.Debugf("Local candidates changed, triggering reconnect")
conn.triggerReconnect()

case <-conn.ctx.Done():
return
}
}
}

// monitorSignalerReady monitors the signaler ready state and triggers reconnect when it transitions from not ready to ready
func (conn *Conn) monitorSignalerReady(signalerReady chan<- struct{}) {
ticker := time.NewTicker(signalerMonitorPeriod)
defer ticker.Stop()

lastReady := true
for {
select {
case <-ticker.C:
currentReady := conn.signaler.Ready()
if !lastReady && currentReady {
select {
case signalerReady <- struct{}{}:
default:
}
}
lastReady = currentReady
case <-conn.ctx.Done():
return
}
}
}

// monitorLocalCandidatesChanged monitors the local candidates and triggers reconnect when they change
func (conn *Conn) monitorLocalCandidatesChanged(localCandidatesChanged chan<- struct{}) {
// TODO: make this global and not per-conn

ufrag, pwd, err := generateICECredentials()
if err != nil {
conn.log.Warnf("Failed to generate ICE credentials: %v", err)
return
}

ticker := time.NewTicker(candidatesMonitorPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := conn.handleCandidateTick(localCandidatesChanged, ufrag, pwd); err != nil {
conn.log.Warnf("Failed to handle candidate tick: %v", err)
}
case <-conn.ctx.Done():
return
}
}
}

func (conn *Conn) handleCandidateTick(localCandidatesChanged chan<- struct{}, ufrag string, pwd string) error {
conn.log.Debugf("Gathering ICE candidates")

transportNet, err := newStdNet(conn.iFaceDiscover, conn.config.ICEConfig.InterfaceBlackList)
if err != nil {
conn.log.Errorf("failed to create pion's stdnet: %s", err)
}

agent, err := newAgent(conn.config, transportNet, candidateTypesP2P(), ufrag, pwd)
if err != nil {
return fmt.Errorf("create ICE agent: %w", err)
}
defer func() {
if err := agent.Close(); err != nil {
conn.log.Warnf("Failed to close ICE agent: %v", err)
}
}()

gatherDone := make(chan struct{})
err = agent.OnCandidate(func(c ice.Candidate) {
log.Debugf("Got candidate: %v", c)
lixmal marked this conversation as resolved.
Show resolved Hide resolved
if c == nil {
close(gatherDone)
}
})
if err != nil {
return fmt.Errorf("set ICE candidate handler: %w", err)
}

if err := agent.GatherCandidates(); err != nil {
return fmt.Errorf("gather ICE candidates: %w", err)
}

ctx, cancel := context.WithTimeout(conn.ctx, candidatedGatheringTimeout)
defer cancel()

select {
case <-ctx.Done():
return fmt.Errorf("wait for gathering: %w", ctx.Err())
case <-gatherDone:
}

candidates, err := agent.GetLocalCandidates()
if err != nil {
return fmt.Errorf("get local candidates: %w", err)
}
log.Debugf("Got candidates: %v", candidates)
lixmal marked this conversation as resolved.
Show resolved Hide resolved

if changed := conn.updateCandidates(candidates); changed {
select {
case localCandidatesChanged <- struct{}{}:
default:
}
}

return nil
}

func (conn *Conn) updateCandidates(newCandidates []ice.Candidate) bool {
conn.candidatesMu.Lock()
defer conn.candidatesMu.Unlock()

if len(conn.currentCandidates) != len(newCandidates) {
conn.currentCandidates = newCandidates
return true
}

for i, candidate := range conn.currentCandidates {
if candidate.String() != newCandidates[i].String() {
lixmal marked this conversation as resolved.
Show resolved Hide resolved
conn.currentCandidates = newCandidates
return true
}
}

return false
}

func (conn *Conn) triggerReconnect() {
select {
case conn.reconnectCh <- struct{}{}:
default:
}
}
4 changes: 2 additions & 2 deletions client/internal/peer/stdnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/netbirdio/netbird/client/internal/stdnet"
)

func (w *WorkerICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNet(w.config.ICEConfig.InterfaceBlackList)
func newStdNet(_ stdnet.ExternalIFaceDiscover, ifaceBlacklist []string) (*stdnet.Net, error) {
return stdnet.NewNet(ifaceBlacklist)
}
4 changes: 2 additions & 2 deletions client/internal/peer/stdnet_android.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package peer

import "github.com/netbirdio/netbird/client/internal/stdnet"

func (w *WorkerICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(w.iFaceDiscover, w.config.ICEConfig.InterfaceBlackList)
func newStdNet(iFaceDiscover stdnet.ExternalIFaceDiscover, ifaceBlacklist []string) (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(iFaceDiscover, ifaceBlacklist)
}
Loading
Loading