Skip to content

Commit

Permalink
improve relayer message cache
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricfung committed Jul 7, 2024
1 parent 171d139 commit 534f52c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 152 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/MixinNetwork/mixin

go 1.22.4
go 1.22.5

replace github.com/dgraph-io/badger/v4 => github.com/MixinNetwork/badger/v4 v4.2.0-F1

Expand Down
4 changes: 2 additions & 2 deletions p2p/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,9 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage
return nil
}
rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...))
success := me.offerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data})
success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data})
if !success {
logger.Verbosef("me.offerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork)
logger.Verbosef("me.offerToPeerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork)
}
}
return nil
Expand Down
152 changes: 3 additions & 149 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"context"
"encoding/binary"
"fmt"
"net"
"slices"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/MixinNetwork/mixin/crypto"
"github.com/MixinNetwork/mixin/logger"
"github.com/MixinNetwork/mixin/util"
"github.com/dgraph-io/ristretto"
)

type Peer struct {
Expand Down Expand Up @@ -399,7 +397,7 @@ func (me *Peer) sendHighToPeer(to crypto.Hash, typ byte, key, data []byte) error
return me.sendToPeer(to, typ, key, data, MsgPriorityHigh)
}

func (me *Peer) offerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) bool {
func (me *Peer) offerToPeerWithCacheCheck(p *Peer, priority int, msg *ChanMsg) bool {
if p.IdForNetwork == me.IdForNetwork {
return true
}
Expand Down Expand Up @@ -451,9 +449,9 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority
panic(peer.IdForNetwork)
}
rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...))
success := peer.offer(priority, &ChanMsg{rk[:], rm})
success := me.offerToPeerWithCacheCheck(peer, priority, &ChanMsg{rk[:], rm})
if !success {
logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork)
logger.Verbosef("me.offerToPeerWithCacheCheck(%s) send timeout\n", peer.IdForNetwork)
}
}
return nil
Expand All @@ -465,41 +463,6 @@ func (me *Peer) sendSnapshotMessageToPeer(to crypto.Hash, snap crypto.Hash, typ
return me.sendToPeer(to, typ, key, data, MsgPriorityNormal)
}

type confirmMap struct {
cache *ristretto.Cache
}

func (m *confirmMap) contains(key []byte, duration time.Duration) bool {
if key == nil {
return false
}
val, found := m.cache.Get(key)
if found {
ts := time.Unix(0, int64(binary.BigEndian.Uint64(val.([]byte))))
return ts.Add(duration).After(time.Now())
}
return false
}

func (m *confirmMap) store(key []byte, ts time.Time) {
if key == nil {
panic(ts)
}
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(ts.UnixNano()))
m.cache.Set(key, buf, 8)
}

type remoteRelayer struct {
Id crypto.Hash
ActiveAt time.Time
}

type relayersMap struct {
sync.RWMutex
m map[crypto.Hash][]*remoteRelayer
}

func (me *Peer) GetNeighbor(key crypto.Hash) *Peer {
p := me.relayers.Get(key)
if p != nil {
Expand All @@ -522,112 +485,3 @@ func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer {
}
return relayers
}

func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash {
m.RLock()
defer m.RUnlock()

var relayers []crypto.Hash
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).Before(time.Now()) {
continue
}
relayers = append(relayers, r.Id)
}
return relayers
}

func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) {
m.Lock()
defer m.Unlock()

var relayers []*remoteRelayer
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).After(time.Now()) {
relayers = append(relayers, r)
}
}
for _, r := range relayers {
if r.Id == v {
r.ActiveAt = time.Now()
return
}
}
i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool {
return r.Id == v
})
if i < 0 {
relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v})
} else {
relayers[i].ActiveAt = time.Now()
}
m.m[key] = relayers
}

type neighborMap struct {
sync.RWMutex
m map[crypto.Hash]*Peer
}

func (m *neighborMap) Get(key crypto.Hash) *Peer {
m.RLock()
defer m.RUnlock()

return m.m[key]
}

func (m *neighborMap) Delete(key crypto.Hash) {
m.Lock()
defer m.Unlock()

delete(m.m, key)
}

func (m *neighborMap) Set(key crypto.Hash, v *Peer) {
m.Lock()
defer m.Unlock()

m.m[key] = v
}

func (m *neighborMap) Put(key crypto.Hash, v *Peer) bool {
m.Lock()
defer m.Unlock()

if m.m[key] != nil {
return false
}
m.m[key] = v
return true
}

func (m *neighborMap) Slice() []*Peer {
m.Lock()
defer m.Unlock()

var peers []*Peer
for _, p := range m.m {
peers = append(peers, p)
}
return peers
}

func (m *neighborMap) Clear() {
m.Lock()
defer m.Unlock()

for id := range m.m {
delete(m.m, id)
}
}

func (m *neighborMap) RunOnce(key crypto.Hash, v *Peer, f func()) {
m.Lock()
defer m.Unlock()

if m.m[key] != nil {
return
}
m.m[key] = v
go f()
}
155 changes: 155 additions & 0 deletions p2p/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package p2p

import (
"encoding/binary"
"slices"
"sync"
"time"

"github.com/MixinNetwork/mixin/crypto"
"github.com/dgraph-io/ristretto"
)

type confirmMap struct {
cache *ristretto.Cache
}

func (m *confirmMap) contains(key []byte, duration time.Duration) bool {
if key == nil {
return false
}
val, found := m.cache.Get(key)
if found {
ts := time.Unix(0, int64(binary.BigEndian.Uint64(val.([]byte))))
return ts.Add(duration).After(time.Now())
}
return false
}

func (m *confirmMap) store(key []byte, ts time.Time) {
if key == nil {
panic(ts)
}
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(ts.UnixNano()))
m.cache.Set(key, buf, 8)
}

type remoteRelayer struct {
Id crypto.Hash
ActiveAt time.Time
}

type relayersMap struct {
sync.RWMutex
m map[crypto.Hash][]*remoteRelayer
}

func (m *relayersMap) Get(key crypto.Hash) []crypto.Hash {
m.RLock()
defer m.RUnlock()

var relayers []crypto.Hash
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).Before(time.Now()) {
continue
}
relayers = append(relayers, r.Id)
}
return relayers
}

func (m *relayersMap) Add(key crypto.Hash, v crypto.Hash) {
m.Lock()
defer m.Unlock()

var relayers []*remoteRelayer
for _, r := range m.m[key] {
if r.ActiveAt.Add(time.Minute).After(time.Now()) {
relayers = append(relayers, r)
}
}
for _, r := range relayers {
if r.Id == v {
r.ActiveAt = time.Now()
return
}
}
i := slices.IndexFunc(relayers, func(r *remoteRelayer) bool {
return r.Id == v
})
if i < 0 {
relayers = append(relayers, &remoteRelayer{ActiveAt: time.Now(), Id: v})
} else {
relayers[i].ActiveAt = time.Now()
}
m.m[key] = relayers
}

type neighborMap struct {
sync.RWMutex
m map[crypto.Hash]*Peer
}

func (m *neighborMap) Get(key crypto.Hash) *Peer {
m.RLock()
defer m.RUnlock()

return m.m[key]
}

func (m *neighborMap) Delete(key crypto.Hash) {
m.Lock()
defer m.Unlock()

delete(m.m, key)
}

func (m *neighborMap) Set(key crypto.Hash, v *Peer) {
m.Lock()
defer m.Unlock()

m.m[key] = v
}

func (m *neighborMap) Put(key crypto.Hash, v *Peer) bool {
m.Lock()
defer m.Unlock()

if m.m[key] != nil {
return false
}
m.m[key] = v
return true
}

func (m *neighborMap) Slice() []*Peer {
m.Lock()
defer m.Unlock()

var peers []*Peer
for _, p := range m.m {
peers = append(peers, p)
}
return peers
}

func (m *neighborMap) Clear() {
m.Lock()
defer m.Unlock()

for id := range m.m {
delete(m.m, id)
}
}

func (m *neighborMap) RunOnce(key crypto.Hash, v *Peer, f func()) {
m.Lock()
defer m.Unlock()

if m.m[key] != nil {
return
}
m.m[key] = v
go f()
}

0 comments on commit 534f52c

Please sign in to comment.