From 11515f0717dee617863ebdb0b1678307b280ec9c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 29 Jan 2024 14:44:25 -0800 Subject: [PATCH] Add a "transient" network connectivity state Previously, we'd consider "transiently" connected peers to be connected. This meant: 1. We wouldn't fire a second event when transitioning to "really connected". The only option for users was to listen on the old-style per-connection notifications. 2. "Connectedness" checks would be a little too eager to treat a peer as connected. For 99% of users, "transient" peers should be treated as disconnected. So while it's technically a breaking change to split-out "transient" connectivity into a separate state, I expect it's more likely to fix bugs than anything. Unfortunately, this change _did_ require several changes to go-libp2p itself because go-libp2p _does_ care about transient connections: 1. We want to keep peerstore information for transient peers. 2. We may sometimes want to treat peers as "connected" in the host. 3. Identify still needs to run over transient connections. fixes #2692 --- core/network/network.go | 5 ++- p2p/host/basic/basic_host.go | 4 ++- p2p/host/pstoremanager/pstoremanager.go | 13 ++++---- p2p/host/routed/routed.go | 4 ++- p2p/net/swarm/swarm.go | 42 ++++++++++++++++++++----- p2p/protocol/identify/id.go | 17 +++++----- 6 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/network/network.go b/core/network/network.go index 66b0a1cd34..9148fc3309 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -61,10 +61,13 @@ const ( // CannotConnect means recently attempted connecting but failed to connect. // (should signal "made effort, failed") CannotConnect + + // Transient means we have a transient connection to the peer, but aren't fully connected. + Transient ) func (c Connectedness) String() string { - str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"} + str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"} if c < 0 || int(c) >= len(str) { return unrecognized } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 8e6e8efe7c..5c75ae751b 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -723,8 +723,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if h.Network().Connectedness(pi.ID) == network.Connected { + connectedness := rh.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go index 2a22b2caee..82c55e8cbf 100644 --- a/p2p/host/pstoremanager/pstoremanager.go +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio ev := e.(event.EvtPeerConnectednessChanged) p := ev.Peer switch ev.Connectedness { - case network.NotConnected: + case network.Connected, network.Transient: + // If we reconnect to the peer before we've cleared the information, + // keep it. This is an optimization to keep the disconnected map + // small. We still need to check that a peer is actually + // disconnected before removing it from the peer store. + delete(disconnected, p) + default: if _, ok := disconnected[p]; !ok { disconnected[p] = time.Now() } - case network.Connected: - // If we reconnect to the peer before we've cleared the information, keep it. - // This is an optimization to keep the disconnected map small. - // We still need to check that a peer is actually disconnected before removing it from the peer store. - delete(disconnected, p) } case <-ticker.C: now := time.Now() diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index eb8e58ee7f..a6e43703c9 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost { func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // first, check if we're already connected unless force direct dial. forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if rh.Network().Connectedness(pi.ID) == network.Connected { + connectedness := rh.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a76edce6ce..935c29626c 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -344,6 +344,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } stat.Direction = dir stat.Opened = time.Now() + isTransient := stat.Transient // Wrap and register the connection. c := &Conn{ @@ -383,8 +384,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } + oldState := s.connectednessUnlocked(p) + c.streams.m = make(map[*Stream]struct{}) - isFirstConnection := len(s.conns.m[p]) == 0 s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -397,8 +399,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // Notify goroutines waiting for a direct connection - if !c.Stat().Transient { + newState := network.Transient + if !isTransient { + newState = network.Connected + + // Notify goroutines waiting for a direct connection + // // Go routines interested in waiting for direct connection first acquire this lock // and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to // prevent deadlock. @@ -412,10 +418,10 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // Emit event after releasing `s.conns` lock so that a consumer can still // use swarm methods that need the `s.conns` lock. - if isFirstConnection { + if oldState != newState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, - Connectedness: network.Connected, + Connectedness: newState, }) } @@ -646,10 +652,30 @@ func isDirectConn(c *Conn) bool { // To check if we have an open connection, use `s.Connectedness(p) == // network.Connected`. func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { - if s.bestConnToPeer(p) != nil { - return network.Connected + s.conns.RLock() + defer s.conns.RUnlock() + + s.connectednessUnlocked(p) +} + +func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { + var haveTransient bool + for _, c := range s.conns.m[p] { + if c.conn.IsClosed() { + // We *will* garbage collect this soon anyways. + continue + } + if c.Stat().Transient { + haveTransient = true + } else { + return network.Connected + } + } + if haveTransient { + return network.Transient + } else { + return network.NotConnected } - return network.NotConnected } // Conns returns a slice of all connections. diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 6c07dbdc8e..e3a36843f3 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -746,7 +746,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo // Taking the lock ensures that we don't concurrently process a disconnect. ids.addrMu.Lock() ttl := peerstore.RecentlyConnectedAddrTTL - if ids.Host.Network().Connectedness(p) == network.Connected { + switch ids.Host.Network().Connectedness(p) { + case network.Transient, network.Connected: ttl = peerstore.ConnectedAddrTTL } @@ -975,13 +976,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { delete(ids.conns, c) ids.connsMu.Unlock() - if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected { - // Last disconnect. - // Undo the setting of addresses to peer.ConnectedAddrTTL we did - ids.addrMu.Lock() - defer ids.addrMu.Unlock() - ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) + switch ids.Host.Network().Connectedness(c.RemotePeer()) { + case network.Connected, network.Transient: + return } + // Last disconnect. + // Undo the setting of addresses to peer.ConnectedAddrTTL we did + ids.addrMu.Lock() + defer ids.addrMu.Unlock() + ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) } func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}