Skip to content

Commit

Permalink
Merge branch 'main' into migrate-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Jan 30, 2023
2 parents 0cc547b + d8ac8d6 commit 3ec4c71
Show file tree
Hide file tree
Showing 8 changed files with 1,730 additions and 0 deletions.
218 changes: 218 additions & 0 deletions connecteventmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package network

import (
"sync"

"github.com/libp2p/go-libp2p/core/peer"
)

type ConnectionListener interface {
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
}

type state byte

const (
stateDisconnected = iota
stateResponsive
stateUnresponsive
)

type connectEventManager struct {
connListeners []ConnectionListener
lk sync.RWMutex
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
stop bool
done chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
peers: make(map[peer.ID]*peerState),
done: make(chan struct{}),
}
evtManager.cond = sync.Cond{L: &evtManager.lk}
return evtManager
}

func (c *connectEventManager) Start() {
go c.worker()
}

func (c *connectEventManager) Stop() {
c.lk.Lock()
c.stop = true
c.lk.Unlock()
c.cond.Broadcast()

<-c.done
}

func (c *connectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
c.peers[p] = state
}
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
}
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()
}
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
return
}
c.setState(p, stateResponsive)
}

// Called when we drop the final connection to a peer.
func (c *connectEventManager) Disconnected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !disconnected -> disconnected

if c.getState(p) == stateDisconnected {
return
}

c.setState(p, stateDisconnected)
}

// Called whenever a peer is unresponsive.
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// responsive -> unresponsive

if c.getState(p) != stateResponsive {
return
}

c.setState(p, stateUnresponsive)
}

// Called whenever we receive a message from a peer.
//
// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive).
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
//
// the "on message" event, so we can't treat this as evidence of a connection.
func (c *connectEventManager) OnMessage(p peer.ID) {
c.lk.RLock()
unresponsive := c.getState(p) == stateUnresponsive
c.lk.RUnlock()

// Only continue if both connected, and unresponsive.
if !unresponsive {
return
}

// unresponsive -> responsive

// We need to make a modification so now take a write lock
c.lk.Lock()
defer c.lk.Unlock()

// Note: state may have changed in the time between when read lock
// was released and write lock taken, so check again
if c.getState(p) != stateUnresponsive {
return
}

c.setState(p, stateResponsive)
}
175 changes: 175 additions & 0 deletions connecteventmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package network

import (
"sync"
"testing"
"time"

"github.com/ipfs/go-libipfs/bitswap/internal/testutil"
"github.com/ipfs/go-libipfs/internal/test"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

type mockConnEvent struct {
connected bool
peer peer.ID
}

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
}

func newMockConnListener() *mockConnListener {
return new(mockConnListener)
}

func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: false, peer: p})
}

func wait(t *testing.T, c *connectEventManager) {
require.Eventually(t, func() bool {
c.lk.RLock()
defer c.lk.RUnlock()
return len(c.changeQueue) == 0
}, time.Second, time.Millisecond, "connection event manager never processed events")
}

func TestConnectEventManagerConnectDisconnect(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

var expectedEvents []mockConnEvent

// Connect A twice, should only see one event
cem.Connected(peers[0])
cem.Connected(peers[0])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[0],
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[1],
connected: true,
})

// We don't expect this to show up.
cem.Disconnected(peers[0])
cem.Connected(peers[0])

connListener.Unlock()

wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

var expectedEvents []mockConnEvent

// Don't mark as connected when we receive a message (could have been delayed).
cem.OnMessage(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Handle connected event.
cem.Connected(p)
wait(t, cem)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// Becomes unresponsive.
cem.MarkUnresponsive(p)
wait(t, cem)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
connected: false,
})
require.Equal(t, expectedEvents, connListener.events)

// We have a new connection, mark them responsive.
cem.Connected(p)
wait(t, cem)
expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// No duplicate event.
cem.OnMessage(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

var expectedEvents []mockConnEvent

// Handle connected event.
cem.Connected(p)
wait(t, cem)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// Becomes unresponsive.
cem.MarkUnresponsive(p)
wait(t, cem)

expectedEvents = append(expectedEvents, mockConnEvent{
peer: p,
connected: false,
})
require.Equal(t, expectedEvents, connListener.events)

cem.Disconnected(p)
wait(t, cem)
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}
Loading

0 comments on commit 3ec4c71

Please sign in to comment.