From 171d1392f353ff815e1d01e82b4242d3cf75b477 Mon Sep 17 00:00:00 2001 From: Cedric Fung Date: Sun, 7 Jul 2024 17:30:08 +0000 Subject: [PATCH] improve relayer message handling --- command.go | 8 ++ main.go | 11 +++ p2p/handle.go | 56 ++++++------ p2p/peer.go | 176 ++++++++++++++++++++++++++---------- rpc/consensus_test.go | 17 ++-- rpc/internal/server/http.go | 14 ++- rpc/internal/server/node.go | 1 + 7 files changed, 199 insertions(+), 84 deletions(-) diff --git a/command.go b/command.go index e6e72310e..f5443ff4a 100644 --- a/command.go +++ b/command.go @@ -835,6 +835,14 @@ func listPeersCmd(c *cli.Context) error { return err } +func listRelayersCmd(c *cli.Context) error { + data, err := callRPC(c.String("node"), "listrelayers", []any{c.String("id")}, c.Bool("time")) + if err == nil { + fmt.Println(string(data)) + } + return err +} + func dumpGraphHeadCmd(c *cli.Context) error { data, err := callRPC(c.String("node"), "dumpgraphhead", []any{}, c.Bool("time")) if err == nil { diff --git a/main.go b/main.go index 085284964..c2cfb0184 100644 --- a/main.go +++ b/main.go @@ -638,6 +638,17 @@ func main() { Usage: "List all the connected peers", Action: listPeersCmd, }, + { + Name: "listrelayers", + Usage: "List the remote relayers for peer", + Action: listRelayersCmd, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "id", + Usage: "the peer node id", + }, + }, + }, { Name: "dumpgraphhead", Usage: "Dump the graph head", diff --git a/p2p/handle.go b/p2p/handle.go index 8af9baeee..d920763a7 100644 --- a/p2p/handle.go +++ b/p2p/handle.go @@ -445,36 +445,40 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage } return me.handlePeerMessage(from, rm) } - if me.relayer == nil { + if !me.IsRelayer() { return nil } - peer := me.consumers.Get(to) - if peer == nil { - peer = me.relayers.Get(to) - } - if peer == nil { - peer = me.remoteRelayers.Get(to) - } - if peer == nil || peer.IdForNetwork == relayerId { - return nil + + var relayers []*Peer + peer := me.GetNeighbor(to) + if peer != nil { + relayers = []*Peer{peer} + } else { + relayers = me.GetRemoteRelayers(to) } data := append([]byte{PeerMessageTypeRelay}, msg.Data...) - rk := crypto.Blake3Hash(append(msg.Data, to[:]...)) - rk = crypto.Blake3Hash(append(rk[:], relayerId[:]...)) - success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data}) - if !success { - logger.Verbosef("peer.offer(%s) relayer timeout\n", peer.IdForNetwork) + rk := crypto.Blake3Hash(data) + rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...)) + for _, peer := range relayers { + if peer.IdForNetwork == relayerId { + return nil + } + rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...)) + success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data}) + if !success { + logger.Verbosef("me.offerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork) + } } return nil } func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte) error { logger.Verbosef("me.updateRemoteRelayerConsumers(%s, %s) => %x", me.Address, relayerId, data) - relayer := me.relayers.Get(relayerId) - if relayer == nil { - relayer = me.consumers.Get(relayerId) + if !me.IsRelayer() { + return nil } - if relayer == nil || !relayer.isRemoteRelayer { + relayer := me.GetNeighbor(relayerId) + if relayer == nil || !relayer.IsRelayer() { return nil } pl := len(crypto.Key{}) + 137 @@ -483,15 +487,12 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte) copy(id[:], data[:32]) token, err := me.handle.AuthenticateAs(relayerId, data[32:pl], 0) if err != nil { - return nil + panic(err) } if token.PeerId != id { - return nil - } - old := me.remoteRelayers.Get(id) - if old == nil || old.consumerAuth == nil || old.consumerAuth.Timestamp < token.Timestamp { - me.remoteRelayers.Set(id, relayer) + panic(id) } + me.remoteRelayers.Add(id, relayer.IdForNetwork) data = data[pl:] } return nil @@ -513,10 +514,7 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error { if err != nil { return err } - peer := me.relayers.Get(peerId) - if peer == nil { - peer = me.consumers.Get(peerId) - } + peer := me.GetNeighbor(peerId) if peer != nil { peer.syncRing.Offer(msg.Graph) } diff --git a/p2p/peer.go b/p2p/peer.go index 57eff3fe8..1e3eebc26 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "net" + "slices" "sync" "time" @@ -34,10 +35,10 @@ type Peer struct { ops chan struct{} stn chan struct{} - relayer *QuicRelayer - consumerAuth *AuthToken - isRemoteRelayer bool - remoteRelayers *neighborMap + relayer *QuicRelayer + consumerAuth *AuthToken + isRelayer bool + remoteRelayers *relayersMap } type SyncPoint struct { @@ -52,14 +53,18 @@ type ChanMsg struct { data []byte } +func (me *Peer) IsRelayer() bool { + return me.isRelayer +} + func (me *Peer) ConnectRelayer(idForNetwork crypto.Hash, addr string) { if a, err := net.ResolveUDPAddr("udp", addr); err != nil { panic(fmt.Errorf("invalid address %s %s", addr, err)) } else if a.Port < 80 || a.IP == nil { panic(fmt.Errorf("invalid address %s %d %s", addr, a.Port, a.IP)) } - if me.isRemoteRelayer { - me.remoteRelayers = &neighborMap{m: make(map[crypto.Hash]*Peer)} + if me.isRelayer { + me.remoteRelayers = &relayersMap{m: make(map[crypto.Hash][]*remoteRelayer)} } for !me.closing { @@ -106,7 +111,15 @@ func (me *Peer) connectRelayer(relayer *Peer) error { func (me *Peer) Neighbors() []*Peer { relayers := me.relayers.Slice() consumers := me.consumers.Slice() - return append(relayers, consumers...) + for _, c := range consumers { + if slices.ContainsFunc(relayers, func(p *Peer) bool { + return p.IdForNetwork == c.IdForNetwork + }) { + continue + } + relayers = append(relayers, c) + } + return relayers } func (p *Peer) disconnect() { @@ -138,19 +151,19 @@ func NewPeer(handle SyncHandle, idForNetwork crypto.Hash, addr string, isRelayer ringSize = ringSize * MaxIncomingStreams } peer := &Peer{ - IdForNetwork: idForNetwork, - Address: addr, - relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, - consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, - highRing: util.NewRingBuffer(ringSize), - normalRing: util.NewRingBuffer(ringSize), - syncRing: util.NewRingBuffer(ringSize), - handle: handle, - sentMetric: &MetricPool{enabled: false}, - receivedMetric: &MetricPool{enabled: false}, - ops: make(chan struct{}), - stn: make(chan struct{}), - isRemoteRelayer: isRelayer, + IdForNetwork: idForNetwork, + Address: addr, + relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, + consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)}, + highRing: util.NewRingBuffer(ringSize), + normalRing: util.NewRingBuffer(ringSize), + syncRing: util.NewRingBuffer(ringSize), + handle: handle, + sentMetric: &MetricPool{enabled: false}, + receivedMetric: &MetricPool{enabled: false}, + ops: make(chan struct{}), + stn: make(chan struct{}), + isRelayer: isRelayer, } peer.ctx = context.Background() // FIXME use real context if handle != nil { @@ -187,7 +200,7 @@ func (me *Peer) ListenConsumers() error { return err } me.relayer = relayer - me.remoteRelayers = &neighborMap{m: make(map[crypto.Hash]*Peer)} + me.remoteRelayers = &relayersMap{m: make(map[crypto.Hash][]*remoteRelayer)} go func() { ticker := time.NewTicker(time.Duration(config.SnapshotRoundGap)) @@ -197,7 +210,7 @@ func (me *Peer) ListenConsumers() error { neighbors := me.Neighbors() msg := me.buildConsumersMessage() for _, p := range neighbors { - if !p.isRemoteRelayer { + if !p.isRelayer { continue } key := crypto.Blake3Hash(append(msg, p.IdForNetwork[:]...)) @@ -223,8 +236,14 @@ func (me *Peer) ListenConsumers() error { return } defer peer.disconnect() + + old := me.consumers.Get(peer.IdForNetwork) + if old != nil { + old.disconnect() + me.consumers.Delete(old.IdForNetwork) + } if !me.consumers.Put(peer.IdForNetwork, peer) { - return + panic(peer.IdForNetwork) } defer me.consumers.Delete(peer.IdForNetwork) @@ -411,10 +430,7 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority } me.sentMetric.handle(typ) - peer := me.consumers.Get(to) - if peer == nil { - peer = me.relayers.Get(to) - } + peer := me.GetNeighbor(to) if peer != nil { success := peer.offer(priority, &ChanMsg{key, data}) if !success { @@ -424,29 +440,21 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority } rm := me.buildRelayMessage(to, data) - if me.remoteRelayers != nil { - peer = me.remoteRelayers.Get(to) - } - if peer != nil { - rk := crypto.Blake3Hash(append(rm, peer.IdForNetwork[:]...)) - success := peer.offer(priority, &ChanMsg{rk[:], rm}) - if !success { - return fmt.Errorf("peer.offer(%s, %s) => %d timeout", peer.Address, peer.IdForNetwork, priority) + rk := crypto.Blake3Hash(rm) + rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...)) + relayers := me.GetRemoteRelayers(to) + if len(relayers) == 0 { + relayers = me.relayers.Slice() + } + for _, peer := range relayers { + if !peer.IsRelayer() { + panic(peer.IdForNetwork) } - return nil - } - - neighbors := me.Neighbors() - for _, peer := range neighbors { - if !peer.isRemoteRelayer { - continue - } - rk := crypto.Blake3Hash(append(rm, peer.IdForNetwork[:]...)) + rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...)) success := peer.offer(priority, &ChanMsg{rk[:], rm}) - if success { - break + if !success { + logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork) } - logger.Printf("peer.offer(%s, %s) => %d timeout", peer.Address, peer.IdForNetwork, priority) } return nil } @@ -482,6 +490,80 @@ func (m *confirmMap) store(key []byte, ts time.Time) { m.cache.Set(key, buf, 8) } +type remoteRelayer struct { + Id crypto.Hash + ActiveAt time.Time +} + +type relayersMap struct { + sync.RWMutex + m map[crypto.Hash][]*remoteRelayer +} + +func (me *Peer) GetNeighbor(key crypto.Hash) *Peer { + p := me.relayers.Get(key) + if p != nil { + return p + } + return me.consumers.Get(key) +} + +func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer { + if me.remoteRelayers == nil { + return nil + } + var relayers []*Peer + ids := me.remoteRelayers.Get(key) + for _, id := range ids { + p := me.GetNeighbor(id) + if p != nil { + relayers = append(relayers, p) + } + } + return relayers +} + +func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash { + m.RLock() + defer m.RUnlock() + + var relayers []crypto.Hash + for _, r := range m.m[key] { + if r.ActiveAt.Add(time.Minute).Before(time.Now()) { + continue + } + relayers = append(relayers, r.Id) + } + return relayers +} + +func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) { + m.Lock() + defer m.Unlock() + + var relayers []*remoteRelayer + for _, r := range m.m[key] { + if r.ActiveAt.Add(time.Minute).After(time.Now()) { + relayers = append(relayers, r) + } + } + for _, r := range relayers { + if r.Id == v { + r.ActiveAt = time.Now() + return + } + } + i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool { + return r.Id == v + }) + if i < 0 { + relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v}) + } else { + relayers[i].ActiveAt = time.Now() + } + m.m[key] = relayers +} + type neighborMap struct { sync.RWMutex m map[crypto.Hash]*Peer diff --git a/rpc/consensus_test.go b/rpc/consensus_test.go index 8e28e095e..1763c3509 100644 --- a/rpc/consensus_test.go +++ b/rpc/consensus_test.go @@ -35,7 +35,7 @@ func TestConsensus(t *testing.T) { testConsensus(t, false) } -func testConsensus(t *testing.T, withRelayers bool) { +func testConsensus(t *testing.T, extrenalRelayers bool) { require := require.New(t) kernel.TestMockReset() @@ -55,7 +55,7 @@ func testConsensus(t *testing.T, withRelayers bool) { require.Nil(err) defer os.RemoveAll(root) - accounts, payees, gdata, plist := setupTestNet(root, withRelayers) + accounts, payees, gdata, plist := setupTestNet(root, extrenalRelayers) require.Len(accounts, NODES) epoch := time.Unix(1551312000, 0) @@ -750,7 +750,7 @@ func testDetermineAccountByIndex(i int, role string) common.Address { return account } -func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Address, []byte, string) { +func setupTestNet(root string, extrenalRelayers bool) ([]common.Address, []common.Address, []byte, string) { var signers, payees, custodians []common.Address var relayers []common.Address @@ -795,7 +795,7 @@ func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Ad peersListHead := `"` + strings.Join(peers[:len(peers)/3], `","`) + `"` peersListTail := `"` + strings.Join(peers[len(peers)/2:], `","`) + `"` - if withRelayers { + if extrenalRelayers { peers := make([]string, len(relayers)) for i, s := range relayers { id := s.Hash().ForNetwork(gns.NetworkId()) @@ -803,7 +803,6 @@ func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Ad } peersListHead = `"` + strings.Join(peers[:len(peers)/3], `","`) + `"` peersListTail = `"` + strings.Join(peers[len(peers)/2:], `","`) + `"` - peersList := `"` + strings.Join(peers[:len(peers)/3], `","`) + `"` for i, a := range relayers { dir := fmt.Sprintf("%s/mixin-160%02d", root, i+1) err := os.MkdirAll(dir, 0755) @@ -811,7 +810,8 @@ func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Ad panic(err) } - configData := []byte(fmt.Sprintf(configDataTmpl, a.PrivateSpendKey, 16000+i+1, peersList, true, 0)) + rpcPort := 26000 + i + 1 + configData := []byte(fmt.Sprintf(configDataTmpl, a.PrivateSpendKey, 16000+i+1, peersListHead, true, rpcPort)) err = os.WriteFile(dir+"/config.toml", configData, 0644) if err != nil { panic(err) @@ -825,6 +825,9 @@ func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Ad cache := newCache(custom) store, _ := storage.NewBadgerStore(custom, dir) node, _ := kernel.SetupNode(custom, store, cache, gns) + + server := NewServer(custom, store, node, rpcPort) + go server.ListenAndServe() go node.Loop() } } @@ -842,7 +845,7 @@ func setupTestNet(root string, withRelayers bool) ([]common.Address, []common.Ad } port := 17000 + i + 1 p2p := fmt.Sprint(port) - isRelayer := !withRelayers && (strings.Contains(peersListHead, p2p) || strings.Contains(peersListTail, p2p)) + isRelayer := !extrenalRelayers && (strings.Contains(peersListHead, p2p) || strings.Contains(peersListTail, p2p)) if isRelayer { peersList = peersListHead } diff --git a/rpc/internal/server/http.go b/rpc/internal/server/http.go index a95e222ae..5a87c7c52 100644 --- a/rpc/internal/server/http.go +++ b/rpc/internal/server/http.go @@ -2,12 +2,14 @@ package server import ( "encoding/json" + "errors" "fmt" "net/http" "strings" "time" "github.com/MixinNetwork/mixin/config" + "github.com/MixinNetwork/mixin/crypto" "github.com/MixinNetwork/mixin/kernel" "github.com/MixinNetwork/mixin/storage" ) @@ -111,7 +113,17 @@ func (impl *RPC) ServeHTTP(w http.ResponseWriter, r *http.Request) { peers = peerNeighbors(impl.Node.Peer.Neighbors()) } rdr.RenderData(peers) - return + case "listrelayers": + if len(call.Params) != 1 { + rdr.RenderError(errors.New("invalid params count")) + return + } + peers := make([]map[string]any, 0) + if strings.HasPrefix(r.RemoteAddr, "127.0.0.1:") { + id, _ := crypto.HashFromString(fmt.Sprint(call.Params[0])) + peers = peerNeighbors(impl.Node.Peer.GetRemoteRelayers(id)) + } + rdr.RenderData(peers) case "dumpgraphhead": data, err := dumpGraphHead(impl.Node, call.Params) if err != nil { diff --git a/rpc/internal/server/node.go b/rpc/internal/server/node.go index 3c4635dde..88441b941 100644 --- a/rpc/internal/server/node.go +++ b/rpc/internal/server/node.go @@ -50,6 +50,7 @@ func peerNeighbors(peers []*p2p.Peer) []map[string]any { data = append(data, map[string]any{ "id": p.IdForNetwork.String(), "address": p.Address, + "relayer": p.IsRelayer(), }) } return data