diff --git a/p2p/handle.go b/p2p/handle.go index 9d018390b..4726a4906 100644 --- a/p2p/handle.go +++ b/p2p/handle.go @@ -510,7 +510,10 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error { } nbrs := me.GetNeighbors(peerId) for _, peer := range nbrs { - peer.syncRing.Offer(msg.Graph) + select { + case peer.syncRing <- msg.Graph: + default: + } } return nil case PeerMessageTypeTransactionRequest: diff --git a/p2p/peer.go b/p2p/peer.go index cdd40f1c6..ddb94587f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,14 +4,12 @@ import ( "context" "fmt" "net" - "slices" "sync" "time" "github.com/MixinNetwork/mixin/config" "github.com/MixinNetwork/mixin/crypto" "github.com/MixinNetwork/mixin/logger" - "github.com/MixinNetwork/mixin/util" ) type Peer struct { @@ -26,9 +24,9 @@ type Peer struct { relayers *neighborMap consumers *neighborMap snapshotsCaches *confirmMap - highRing *util.RingBuffer - normalRing *util.RingBuffer - syncRing *util.RingBuffer + highRing chan *ChanMsg + normalRing chan *ChanMsg + syncRing chan []*SyncPoint closing bool ops chan struct{} stn chan struct{} @@ -109,15 +107,7 @@ func (me *Peer) connectRelayer(relayer *Peer) error { func (me *Peer) Neighbors() []*Peer { relayers := me.relayers.Slice() consumers := me.consumers.Slice() - for _, c := range consumers { - if slices.ContainsFunc(relayers, func(p *Peer) bool { - return p.IdForNetwork == c.IdForNetwork - }) { - continue - } - relayers = append(relayers, c) - } - return relayers + return append(relayers, consumers...) } func (p *Peer) disconnect() { @@ -125,10 +115,10 @@ func (p *Peer) disconnect() { return } p.closing = true - p.highRing.Dispose() - p.normalRing.Dispose() - p.syncRing.Dispose() <-p.ops + close(p.highRing) + close(p.normalRing) + close(p.syncRing) <-p.stn } @@ -145,17 +135,14 @@ func (me *Peer) Metric() map[string]*MetricPool { func NewPeer(handle SyncHandle, idForNetwork crypto.Hash, addr string, isRelayer bool) *Peer { ringSize := uint64(1024) - if isRelayer { - ringSize = ringSize * MaxIncomingStreams - } peer := &Peer{ IdForNetwork: idForNetwork, Address: addr, relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, - highRing: util.NewRingBuffer(ringSize), - normalRing: util.NewRingBuffer(ringSize), - syncRing: util.NewRingBuffer(ringSize), + highRing: make(chan *ChanMsg, ringSize), + normalRing: make(chan *ChanMsg, ringSize), + syncRing: make(chan []*SyncPoint, 128), handle: handle, sentMetric: &MetricPool{enabled: false}, receivedMetric: &MetricPool{enabled: false}, @@ -175,9 +162,9 @@ func (me *Peer) Teardown() { if me.relayer != nil { me.relayer.Close() } - me.highRing.Dispose() - me.normalRing.Dispose() - me.syncRing.Dispose() + close(me.highRing) + close(me.normalRing) + close(me.syncRing) peers := me.Neighbors() var wg sync.WaitGroup for _, p := range peers { @@ -211,8 +198,7 @@ func (me *Peer) ListenConsumers() error { if !p.isRelayer { continue } - key := crypto.Blake3Hash(append(msg, p.IdForNetwork[:]...)) - me.sendToPeer(p.IdForNetwork, PeerMessageTypeConsumers, key[:], msg, MsgPriorityNormal) + me.offerToPeerWithCacheCheck(p, MsgPriorityNormal, &ChanMsg{nil, msg}) } <-ticker.C @@ -261,34 +247,9 @@ func (me *Peer) loopSendingStream(p *Peer, consumer Client) (*ChanMsg, error) { defer consumer.Close("loopSendingStream") for !me.closing && !p.closing { - msgs := []*ChanMsg{} - for len(msgs) < 16 { - item, err := p.highRing.Poll(false) - if err != nil { - return nil, fmt.Errorf("peer.highRing(%s) => %v", p.IdForNetwork, err) - } else if item == nil { - break - } - msg := item.(*ChanMsg) - if me.snapshotsCaches.contains(msg.key, time.Minute) { - continue - } - msgs = append(msgs, msg) - } - - for len(msgs) < 32 { - item, err := p.normalRing.Poll(false) - if err != nil { - return nil, fmt.Errorf("peer.normalRing(%s) => %v", p.IdForNetwork, err) - } else if item == nil { - break - } - msg := item.(*ChanMsg) - if me.snapshotsCaches.contains(msg.key, time.Minute) { - continue - } - msgs = append(msgs, msg) - } + hm := me.pollRingWithCache(p.highRing, 16) + nm := me.pollRingWithCache(p.normalRing, 16) + msgs := append(hm, nm...) if len(msgs) == 0 { time.Sleep(300 * time.Millisecond) @@ -309,6 +270,22 @@ func (me *Peer) loopSendingStream(p *Peer, consumer Client) (*ChanMsg, error) { return nil, fmt.Errorf("PEER DONE") } +func (me *Peer) pollRingWithCache(ring chan *ChanMsg, limit int) []*ChanMsg { + var msgs []*ChanMsg + for len(msgs) < limit { + select { + case msg := <-ring: + if me.snapshotsCaches.contains(msg.key, time.Minute) { + continue + } + msgs = append(msgs, msg) + default: + return msgs + } + } + return msgs +} + func (me *Peer) loopReceiveMessage(peer *Peer, client Client) { logger.Printf("me.loopReceiveMessage(%s, %s)", me.Address, client.RemoteAddr().String()) receive := make(chan *PeerMessage, 1024) @@ -408,13 +385,24 @@ func (me *Peer) offerToPeerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) b } func (p *Peer) offer(priority int, msg *ChanMsg) bool { + if p.closing { + return false + } switch priority { case MsgPriorityNormal: - s, err := p.normalRing.Offer(msg) - return s && err == nil + select { + case p.normalRing <- msg: + return true + default: + return false + } case MsgPriorityHigh: - s, err := p.highRing.Offer(msg) - return s && err == nil + select { + case p.highRing <- msg: + return true + default: + return false + } } panic(priority) } diff --git a/p2p/sync.go b/p2p/sync.go index 32449d428..60304470e 100644 --- a/p2p/sync.go +++ b/p2p/sync.go @@ -142,15 +142,14 @@ func (me *Peer) getSyncPointOffset(p *Peer) (map[crypto.Hash]*SyncPoint, uint64) startAt := time.Now() for !me.closing && !p.closing { - item, err := p.syncRing.Poll(false) - if err != nil { - break - } else if item == nil { + var g []*SyncPoint + select { + case g = <-p.syncRing: + default: time.Sleep(100 * time.Millisecond) continue } - g := item.([]*SyncPoint) graph = make(map[crypto.Hash]*SyncPoint) for _, r := range g { graph[r.NodeId] = r diff --git a/util/ring.go b/util/ring.go deleted file mode 100644 index 088131a9a..000000000 --- a/util/ring.go +++ /dev/null @@ -1,164 +0,0 @@ -package util - -import ( - "errors" - "runtime" - "sync/atomic" -) - -var ( - // ErrDisposed is returned when an operation is performed on a disposed - // queue. - ErrDisposed = errors.New(`queue: disposed`) -) - -// roundUp takes a uint64 greater than 0 and rounds it up to the next -// power of 2. -func roundUp(v uint64) uint64 { - v-- - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - v |= v >> 32 - v++ - return v -} - -type node struct { - position uint64 - data any -} - -// RingBuffer is a MPMC buffer that achieves thread safety with CAS operations -// only. A put on full or get on empty call will block until an item -// is put or retrieved. Calling Dispose on the RingBuffer will unblock -// any blocked threads with an error. This buffer is similar to the buffer -// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue -// with some minor additions. -type RingBuffer struct { - _padding0 [8]uint64 - queue uint64 - _padding1 [8]uint64 - dequeue uint64 - _padding2 [8]uint64 - mask, disposed uint64 - _padding3 [8]uint64 - nodes []*node -} - -func (rb *RingBuffer) init(size uint64) { - size = roundUp(size) - rb.nodes = make([]*node, size) - for i := uint64(0); i < size; i++ { - rb.nodes[i] = &node{position: i} - } - rb.mask = size - 1 // so we don't have to do this with every put/get operation -} - -func (rb *RingBuffer) Reset() { - atomic.StoreUint64(&rb.disposed, 1) - atomic.StoreUint64(&rb.queue, 0) - atomic.StoreUint64(&rb.dequeue, 0) - for i, n := range rb.nodes { - n.position = uint64(i) - } - atomic.StoreUint64(&rb.disposed, 0) -} - -// Offer adds the provided item to the queue if there is space. If the queue -// is full, this call will return false. An error will be returned if the -// queue is disposed. -func (rb *RingBuffer) Offer(item any) (bool, error) { - return rb.put(item, true) -} - -func (rb *RingBuffer) put(item any, offer bool) (bool, error) { - var n *node - pos := atomic.LoadUint64(&rb.queue) -L: - for { - if atomic.LoadUint64(&rb.disposed) == 1 { - return false, ErrDisposed - } - - n = rb.nodes[pos&rb.mask] - seq := atomic.LoadUint64(&n.position) - switch dif := seq - pos; { - case dif == 0: - if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) { - break L - } - default: - pos = atomic.LoadUint64(&rb.queue) - } - - if offer { - return false, nil - } - runtime.Gosched() // free up the cpu before the next iteration - } - - n.data = item - atomic.StoreUint64(&n.position, pos+1) - return true, nil -} - -// Poll will return the next item in the queue. This call will block -// if the queue is empty. This call will unblock when an item is added -// to the queue, Dispose is called on the queue, or the timeout is reached. An -// error will be returned if the queue is disposed or a timeout occurs. A -// non-positive timeout will block indefinitely. -func (rb *RingBuffer) Poll(block bool) (any, error) { - var ( - n *node - pos = atomic.LoadUint64(&rb.dequeue) - ) -L: - for { - if atomic.LoadUint64(&rb.disposed) == 1 { - return nil, ErrDisposed - } - - n = rb.nodes[pos&rb.mask] - seq := atomic.LoadUint64(&n.position) - switch dif := seq - (pos + 1); { - case dif == 0: - if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) { - break L - } - default: - pos = atomic.LoadUint64(&rb.dequeue) - } - - if !block { - return nil, nil - } - runtime.Gosched() // free up the cpu before the next iteration - } - data := n.data - n.data = nil - atomic.StoreUint64(&n.position, pos+rb.mask+1) - return data, nil -} - -// Len returns the number of items in the queue. -func (rb *RingBuffer) Len() uint64 { - return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue) -} - -// Dispose will dispose of this queue and free any blocked threads -// in the Put and/or Get methods. Calling those methods on a disposed -// queue will return an error. -func (rb *RingBuffer) Dispose() { - atomic.CompareAndSwapUint64(&rb.disposed, 0, 1) -} - -// NewRingBuffer will allocate, initialize, and return a ring buffer -// with the specified size. -func NewRingBuffer(size uint64) *RingBuffer { - rb := &RingBuffer{} - rb.init(size) - return rb -} diff --git a/util/ring_test.go b/util/ring_test.go deleted file mode 100644 index 27934b68b..000000000 --- a/util/ring_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package util - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestRingBuffer(t *testing.T) { - require := require.New(t) - - pool := NewRingBuffer(256) - for i := 0; i < 256; i++ { - ok, err := pool.Offer(i) - require.True(ok) - require.Nil(err) - } - for i := 0; i < 256; i++ { - ok, err := pool.Offer(i) - require.False(ok) - require.Nil(err) - } - for i := 0; i < 256; i++ { - m, err := pool.Poll(false) - require.NotNil(m) - require.Nil(err) - } - for i := 0; i < 256; i++ { - m, err := pool.Poll(false) - require.Nil(m) - require.Nil(err) - } -}