Skip to content

Commit

Permalink
replace peer queue ring buffer with go channel
Browse files Browse the repository at this point in the history
  • Loading branch information
vanessaviolet committed Jul 8, 2024
1 parent e9eb373 commit 4a3d4e4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 263 deletions.
5 changes: 4 additions & 1 deletion p2p/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error {
}
nbrs := me.GetNeighbors(peerId)
for _, peer := range nbrs {
peer.syncRing.Offer(msg.Graph)
select {
case peer.syncRing <- msg.Graph:
default:
}
}
return nil
case PeerMessageTypeTransactionRequest:
Expand Down
108 changes: 48 additions & 60 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"context"
"fmt"
"net"
"slices"
"sync"
"time"

"github.com/MixinNetwork/mixin/config"
"github.com/MixinNetwork/mixin/crypto"
"github.com/MixinNetwork/mixin/logger"
"github.com/MixinNetwork/mixin/util"
)

type Peer struct {
Expand All @@ -26,9 +24,9 @@ type Peer struct {
relayers *neighborMap
consumers *neighborMap
snapshotsCaches *confirmMap
highRing *util.RingBuffer
normalRing *util.RingBuffer
syncRing *util.RingBuffer
highRing chan *ChanMsg
normalRing chan *ChanMsg
syncRing chan []*SyncPoint
closing bool
ops chan struct{}
stn chan struct{}
Expand Down Expand Up @@ -109,26 +107,18 @@ func (me *Peer) connectRelayer(relayer *Peer) error {
func (me *Peer) Neighbors() []*Peer {
relayers := me.relayers.Slice()
consumers := me.consumers.Slice()
for _, c := range consumers {
if slices.ContainsFunc(relayers, func(p *Peer) bool {
return p.IdForNetwork == c.IdForNetwork
}) {
continue
}
relayers = append(relayers, c)
}
return relayers
return append(relayers, consumers...)
}

func (p *Peer) disconnect() {
if p.closing {
return
}
p.closing = true
p.highRing.Dispose()
p.normalRing.Dispose()
p.syncRing.Dispose()
<-p.ops
close(p.highRing)
close(p.normalRing)
close(p.syncRing)
<-p.stn
}

Expand All @@ -145,17 +135,14 @@ func (me *Peer) Metric() map[string]*MetricPool {

func NewPeer(handle SyncHandle, idForNetwork crypto.Hash, addr string, isRelayer bool) *Peer {
ringSize := uint64(1024)
if isRelayer {
ringSize = ringSize * MaxIncomingStreams
}
peer := &Peer{
IdForNetwork: idForNetwork,
Address: addr,
relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
highRing: util.NewRingBuffer(ringSize),
normalRing: util.NewRingBuffer(ringSize),
syncRing: util.NewRingBuffer(ringSize),
highRing: make(chan *ChanMsg, ringSize),
normalRing: make(chan *ChanMsg, ringSize),
syncRing: make(chan []*SyncPoint, 128),
handle: handle,
sentMetric: &MetricPool{enabled: false},
receivedMetric: &MetricPool{enabled: false},
Expand All @@ -175,9 +162,9 @@ func (me *Peer) Teardown() {
if me.relayer != nil {
me.relayer.Close()
}
me.highRing.Dispose()
me.normalRing.Dispose()
me.syncRing.Dispose()
close(me.highRing)
close(me.normalRing)
close(me.syncRing)
peers := me.Neighbors()
var wg sync.WaitGroup
for _, p := range peers {
Expand Down Expand Up @@ -211,8 +198,7 @@ func (me *Peer) ListenConsumers() error {
if !p.isRelayer {
continue
}
key := crypto.Blake3Hash(append(msg, p.IdForNetwork[:]...))
me.sendToPeer(p.IdForNetwork, PeerMessageTypeConsumers, key[:], msg, MsgPriorityNormal)
me.offerToPeerWithCacheCheck(p, MsgPriorityNormal, &ChanMsg{nil, msg})
}

<-ticker.C
Expand Down Expand Up @@ -261,34 +247,9 @@ func (me *Peer) loopSendingStream(p *Peer, consumer Client) (*ChanMsg, error) {
defer consumer.Close("loopSendingStream")

for !me.closing && !p.closing {
msgs := []*ChanMsg{}
for len(msgs) < 16 {
item, err := p.highRing.Poll(false)
if err != nil {
return nil, fmt.Errorf("peer.highRing(%s) => %v", p.IdForNetwork, err)
} else if item == nil {
break
}
msg := item.(*ChanMsg)
if me.snapshotsCaches.contains(msg.key, time.Minute) {
continue
}
msgs = append(msgs, msg)
}

for len(msgs) < 32 {
item, err := p.normalRing.Poll(false)
if err != nil {
return nil, fmt.Errorf("peer.normalRing(%s) => %v", p.IdForNetwork, err)
} else if item == nil {
break
}
msg := item.(*ChanMsg)
if me.snapshotsCaches.contains(msg.key, time.Minute) {
continue
}
msgs = append(msgs, msg)
}
hm := me.pollRingWithCache(p.highRing, 16)
nm := me.pollRingWithCache(p.normalRing, 16)
msgs := append(hm, nm...)

if len(msgs) == 0 {
time.Sleep(300 * time.Millisecond)
Expand All @@ -309,6 +270,22 @@ func (me *Peer) loopSendingStream(p *Peer, consumer Client) (*ChanMsg, error) {
return nil, fmt.Errorf("PEER DONE")
}

func (me *Peer) pollRingWithCache(ring chan *ChanMsg, limit int) []*ChanMsg {
var msgs []*ChanMsg
for len(msgs) < limit {
select {
case msg := <-ring:
if me.snapshotsCaches.contains(msg.key, time.Minute) {
continue
}
msgs = append(msgs, msg)
default:
return msgs
}
}
return msgs
}

func (me *Peer) loopReceiveMessage(peer *Peer, client Client) {
logger.Printf("me.loopReceiveMessage(%s, %s)", me.Address, client.RemoteAddr().String())
receive := make(chan *PeerMessage, 1024)
Expand Down Expand Up @@ -408,13 +385,24 @@ func (me *Peer) offerToPeerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) b
}

func (p *Peer) offer(priority int, msg *ChanMsg) bool {
if p.closing {
return false
}
switch priority {
case MsgPriorityNormal:
s, err := p.normalRing.Offer(msg)
return s && err == nil
select {
case p.normalRing <- msg:
return true
default:
return false
}
case MsgPriorityHigh:
s, err := p.highRing.Offer(msg)
return s && err == nil
select {
case p.highRing <- msg:
return true
default:
return false
}
}
panic(priority)
}
Expand Down
9 changes: 4 additions & 5 deletions p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ func (me *Peer) getSyncPointOffset(p *Peer) (map[crypto.Hash]*SyncPoint, uint64)

startAt := time.Now()
for !me.closing && !p.closing {
item, err := p.syncRing.Poll(false)
if err != nil {
break
} else if item == nil {
var g []*SyncPoint
select {
case g = <-p.syncRing:
default:
time.Sleep(100 * time.Millisecond)
continue
}

g := item.([]*SyncPoint)
graph = make(map[crypto.Hash]*SyncPoint)
for _, r := range g {
graph[r.NodeId] = r
Expand Down
164 changes: 0 additions & 164 deletions util/ring.go

This file was deleted.

Loading

0 comments on commit 4a3d4e4

Please sign in to comment.