Skip to content

Commit

Permalink
improve relayer message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricfung committed Jul 7, 2024
1 parent 75ca9f6 commit 171d139
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 84 deletions.
8 changes: 8 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,14 @@ func listPeersCmd(c *cli.Context) error {
return err
}

func listRelayersCmd(c *cli.Context) error {
data, err := callRPC(c.String("node"), "listrelayers", []any{c.String("id")}, c.Bool("time"))
if err == nil {
fmt.Println(string(data))
}
return err
}

func dumpGraphHeadCmd(c *cli.Context) error {
data, err := callRPC(c.String("node"), "dumpgraphhead", []any{}, c.Bool("time"))
if err == nil {
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,17 @@ func main() {
Usage: "List all the connected peers",
Action: listPeersCmd,
},
{
Name: "listrelayers",
Usage: "List the remote relayers for peer",
Action: listRelayersCmd,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "the peer node id",
},
},
},
{
Name: "dumpgraphhead",
Usage: "Dump the graph head",
Expand Down
56 changes: 27 additions & 29 deletions p2p/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,36 +445,40 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage
}
return me.handlePeerMessage(from, rm)
}
if me.relayer == nil {
if !me.IsRelayer() {
return nil
}
peer := me.consumers.Get(to)
if peer == nil {
peer = me.relayers.Get(to)
}
if peer == nil {
peer = me.remoteRelayers.Get(to)
}
if peer == nil || peer.IdForNetwork == relayerId {
return nil

var relayers []*Peer
peer := me.GetNeighbor(to)
if peer != nil {
relayers = []*Peer{peer}
} else {
relayers = me.GetRemoteRelayers(to)
}
data := append([]byte{PeerMessageTypeRelay}, msg.Data...)
rk := crypto.Blake3Hash(append(msg.Data, to[:]...))
rk = crypto.Blake3Hash(append(rk[:], relayerId[:]...))
success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data})
if !success {
logger.Verbosef("peer.offer(%s) relayer timeout\n", peer.IdForNetwork)
rk := crypto.Blake3Hash(data)
rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...))
for _, peer := range relayers {
if peer.IdForNetwork == relayerId {
return nil
}
rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...))
success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data})
if !success {
logger.Verbosef("me.offerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork)
}
}
return nil
}

func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte) error {
logger.Verbosef("me.updateRemoteRelayerConsumers(%s, %s) => %x", me.Address, relayerId, data)
relayer := me.relayers.Get(relayerId)
if relayer == nil {
relayer = me.consumers.Get(relayerId)
if !me.IsRelayer() {
return nil
}
if relayer == nil || !relayer.isRemoteRelayer {
relayer := me.GetNeighbor(relayerId)
if relayer == nil || !relayer.IsRelayer() {
return nil
}
pl := len(crypto.Key{}) + 137
Expand All @@ -483,15 +487,12 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte)
copy(id[:], data[:32])
token, err := me.handle.AuthenticateAs(relayerId, data[32:pl], 0)
if err != nil {
return nil
panic(err)
}
if token.PeerId != id {
return nil
}
old := me.remoteRelayers.Get(id)
if old == nil || old.consumerAuth == nil || old.consumerAuth.Timestamp < token.Timestamp {
me.remoteRelayers.Set(id, relayer)
panic(id)
}
me.remoteRelayers.Add(id, relayer.IdForNetwork)
data = data[pl:]
}
return nil
Expand All @@ -513,10 +514,7 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error {
if err != nil {
return err
}
peer := me.relayers.Get(peerId)
if peer == nil {
peer = me.consumers.Get(peerId)
}
peer := me.GetNeighbor(peerId)
if peer != nil {
peer.syncRing.Offer(msg.Graph)

Check failure on line 519 in p2p/handle.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `peer.syncRing.Offer` is not checked (errcheck)
}
Expand Down
176 changes: 129 additions & 47 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"net"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -34,10 +35,10 @@ type Peer struct {
ops chan struct{}
stn chan struct{}

relayer *QuicRelayer
consumerAuth *AuthToken
isRemoteRelayer bool
remoteRelayers *neighborMap
relayer *QuicRelayer
consumerAuth *AuthToken
isRelayer bool
remoteRelayers *relayersMap
}

type SyncPoint struct {
Expand All @@ -52,14 +53,18 @@ type ChanMsg struct {
data []byte
}

func (me *Peer) IsRelayer() bool {
return me.isRelayer
}

func (me *Peer) ConnectRelayer(idForNetwork crypto.Hash, addr string) {
if a, err := net.ResolveUDPAddr("udp", addr); err != nil {
panic(fmt.Errorf("invalid address %s %s", addr, err))
} else if a.Port < 80 || a.IP == nil {
panic(fmt.Errorf("invalid address %s %d %s", addr, a.Port, a.IP))
}
if me.isRemoteRelayer {
me.remoteRelayers = &neighborMap{m: make(map[crypto.Hash]*Peer)}
if me.isRelayer {
me.remoteRelayers = &relayersMap{m: make(map[crypto.Hash][]*remoteRelayer)}
}

for !me.closing {
Expand Down Expand Up @@ -106,7 +111,15 @@ func (me *Peer) connectRelayer(relayer *Peer) error {
func (me *Peer) Neighbors() []*Peer {
relayers := me.relayers.Slice()
consumers := me.consumers.Slice()
return append(relayers, consumers...)
for _, c := range consumers {
if slices.ContainsFunc(relayers, func(p *Peer) bool {
return p.IdForNetwork == c.IdForNetwork
}) {
continue
}
relayers = append(relayers, c)
}
return relayers
}

func (p *Peer) disconnect() {
Expand Down Expand Up @@ -138,19 +151,19 @@ func NewPeer(handle SyncHandle, idForNetwork crypto.Hash, addr string, isRelayer
ringSize = ringSize * MaxIncomingStreams
}
peer := &Peer{
IdForNetwork: idForNetwork,
Address: addr,
relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
highRing: util.NewRingBuffer(ringSize),
normalRing: util.NewRingBuffer(ringSize),
syncRing: util.NewRingBuffer(ringSize),
handle: handle,
sentMetric: &MetricPool{enabled: false},
receivedMetric: &MetricPool{enabled: false},
ops: make(chan struct{}),
stn: make(chan struct{}),
isRemoteRelayer: isRelayer,
IdForNetwork: idForNetwork,
Address: addr,
relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
highRing: util.NewRingBuffer(ringSize),
normalRing: util.NewRingBuffer(ringSize),
syncRing: util.NewRingBuffer(ringSize),
handle: handle,
sentMetric: &MetricPool{enabled: false},
receivedMetric: &MetricPool{enabled: false},
ops: make(chan struct{}),
stn: make(chan struct{}),
isRelayer: isRelayer,
}
peer.ctx = context.Background() // FIXME use real context
if handle != nil {
Expand Down Expand Up @@ -187,7 +200,7 @@ func (me *Peer) ListenConsumers() error {
return err
}
me.relayer = relayer
me.remoteRelayers = &neighborMap{m: make(map[crypto.Hash]*Peer)}
me.remoteRelayers = &relayersMap{m: make(map[crypto.Hash][]*remoteRelayer)}

go func() {
ticker := time.NewTicker(time.Duration(config.SnapshotRoundGap))
Expand All @@ -197,7 +210,7 @@ func (me *Peer) ListenConsumers() error {
neighbors := me.Neighbors()
msg := me.buildConsumersMessage()
for _, p := range neighbors {
if !p.isRemoteRelayer {
if !p.isRelayer {
continue
}
key := crypto.Blake3Hash(append(msg, p.IdForNetwork[:]...))
Expand All @@ -223,8 +236,14 @@ func (me *Peer) ListenConsumers() error {
return
}
defer peer.disconnect()

old := me.consumers.Get(peer.IdForNetwork)
if old != nil {
old.disconnect()
me.consumers.Delete(old.IdForNetwork)
}
if !me.consumers.Put(peer.IdForNetwork, peer) {
return
panic(peer.IdForNetwork)
}
defer me.consumers.Delete(peer.IdForNetwork)

Expand Down Expand Up @@ -411,10 +430,7 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority
}
me.sentMetric.handle(typ)

peer := me.consumers.Get(to)
if peer == nil {
peer = me.relayers.Get(to)
}
peer := me.GetNeighbor(to)
if peer != nil {
success := peer.offer(priority, &ChanMsg{key, data})
if !success {
Expand All @@ -424,29 +440,21 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority
}

rm := me.buildRelayMessage(to, data)
if me.remoteRelayers != nil {
peer = me.remoteRelayers.Get(to)
}
if peer != nil {
rk := crypto.Blake3Hash(append(rm, peer.IdForNetwork[:]...))
success := peer.offer(priority, &ChanMsg{rk[:], rm})
if !success {
return fmt.Errorf("peer.offer(%s, %s) => %d timeout", peer.Address, peer.IdForNetwork, priority)
rk := crypto.Blake3Hash(rm)
rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...))
relayers := me.GetRemoteRelayers(to)
if len(relayers) == 0 {
relayers = me.relayers.Slice()
}
for _, peer := range relayers {
if !peer.IsRelayer() {
panic(peer.IdForNetwork)
}
return nil
}

neighbors := me.Neighbors()
for _, peer := range neighbors {
if !peer.isRemoteRelayer {
continue
}
rk := crypto.Blake3Hash(append(rm, peer.IdForNetwork[:]...))
rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...))
success := peer.offer(priority, &ChanMsg{rk[:], rm})
if success {
break
if !success {
logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork)
}
logger.Printf("peer.offer(%s, %s) => %d timeout", peer.Address, peer.IdForNetwork, priority)
}
return nil
}
Expand Down Expand Up @@ -482,6 +490,80 @@ func (m *confirmMap) store(key []byte, ts time.Time) {
m.cache.Set(key, buf, 8)
}

type remoteRelayer struct {
Id crypto.Hash
ActiveAt time.Time
}

type relayersMap struct {
sync.RWMutex
m map[crypto.Hash][]*remoteRelayer
}

func (me *Peer) GetNeighbor(key crypto.Hash) *Peer {
p := me.relayers.Get(key)
if p != nil {
return p
}
return me.consumers.Get(key)
}

func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer {
if me.remoteRelayers == nil {
return nil
}
var relayers []*Peer
ids := me.remoteRelayers.Get(key)
for _, id := range ids {
p := me.GetNeighbor(id)
if p != nil {
relayers = append(relayers, p)
}
}
return relayers
}

func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash {
m.RLock()
defer m.RUnlock()

var relayers []crypto.Hash
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).Before(time.Now()) {
continue
}
relayers = append(relayers, r.Id)
}
return relayers
}

func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) {
m.Lock()
defer m.Unlock()

var relayers []*remoteRelayer
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).After(time.Now()) {
relayers = append(relayers, r)
}
}
for _, r := range relayers {
if r.Id == v {
r.ActiveAt = time.Now()
return
}
}
i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool {
return r.Id == v
})
if i < 0 {
relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v})
} else {
relayers[i].ActiveAt = time.Now()
}
m.m[key] = relayers
}

type neighborMap struct {
sync.RWMutex
m map[crypto.Hash]*Peer
Expand Down
Loading

0 comments on commit 171d139

Please sign in to comment.