Skip to content

Commit

Permalink
refactor(blocksync): migrate peer-manager on generic store (#666)
Browse files Browse the repository at this point in the history
* refactor: implement a peer manager and some additional improvements

* refactor: move in-memory store in libs/store package

* refactor: migrate blocksync peer-manager to libs/store

* chore: regenerate mock libs/store/mocks

* refactor: move AndX func to the libs/store
  • Loading branch information
shotonoff authored Aug 21, 2023
1 parent 8fe858f commit d90e058
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 153 deletions.
2 changes: 1 addition & 1 deletion internal/blocksync/block_fetch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *jobGenerator) nextJob(ctx context.Context) (*workerpool.Job, error) {
if err != nil {
return nil, err
}
p.peerStore.PeerUpdate(peer.peerID, ResetMonitor(), AddNumPending(1))
p.peerStore.Update(peer.peerID, ResetMonitor(), AddNumPending(1))
return workerpool.NewJob(blockFetchJobHandler(p.client, peer, height)), nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/blocksync/block_fetch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (suite *BlockFetchJobTestSuite) TestJobGeneratorNextJob() {

logger := log.NewNopLogger()
peerStore := NewInMemPeerStore()
peerStore.Put(suite.peer)
peerStore.Put(suite.peer.peerID, suite.peer)
jobGen := newJobGenerator(5, logger, suite.client, peerStore)

job, err := jobGen.nextJob(ctx)
Expand Down Expand Up @@ -136,7 +136,7 @@ func (suite *BlockFetchJobTestSuite) TestGeneratorNextJobWaitForPeerAndPushBackH
}()
nextJobCh <- struct{}{}
jobGen.pushBack(9)
peerStore.Put(suite.peer)
peerStore.Put(suite.peer.peerID, suite.peer)
nextJobCh <- struct{}{}
heightCheck := mock.MatchedBy(func(height int64) bool {
return suite.Contains([]int64{5, 9}, height)
Expand Down
192 changes: 54 additions & 138 deletions internal/blocksync/peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,15 @@ import (
"golang.org/x/exp/constraints"

"github.com/tendermint/tendermint/internal/libs/flowrate"
"github.com/tendermint/tendermint/libs/store"
"github.com/tendermint/tendermint/types"
)

type (
// PeerQueryFunc is a function type for peer specification function
PeerQueryFunc func(peer PeerData) bool
// PeerUpdateFunc is a function type for peer update functions
PeerUpdateFunc func(peer *PeerData)
// InMemPeerStore in-memory peer store
InMemPeerStore struct {
mtx sync.RWMutex
peerIDx map[types.NodeID]int
peers []*PeerData
store store.Store[types.NodeID, PeerData]
maxHeight int64
}
// PeerData uses to keep peer related data like base height and the current height etc
Expand All @@ -41,56 +37,49 @@ const (
// NewInMemPeerStore creates a new in-memory peer store
func NewInMemPeerStore(peers ...PeerData) *InMemPeerStore {
mem := &InMemPeerStore{
peerIDx: make(map[types.NodeID]int),
store: store.NewInMemStore[types.NodeID, PeerData](),
}
for _, peer := range peers {
mem.Put(peer)
mem.Put(peer.peerID, peer)
}
return mem
}

// Get returns peer's data and true if the peer is found otherwise empty structure and false
func (p *InMemPeerStore) Get(peerID types.NodeID) (PeerData, bool) {
p.mtx.RLock()
defer p.mtx.RUnlock()
peer, found := p.get(peerID)
if found {
return *peer, true
}
return PeerData{}, false
return p.store.Get(peerID)
}

// GetAndRemove combines Get operation and Remove in one call
func (p *InMemPeerStore) GetAndRemove(peerID types.NodeID) (PeerData, bool) {
// GetAndDelete combines Get operation and Delete in one call
func (p *InMemPeerStore) GetAndDelete(peerID types.NodeID) (PeerData, bool) {
p.mtx.Lock()
defer p.mtx.Unlock()
peer, found := p.get(peerID)
if found {
p.remove(peerID)
return *peer, true
peer, found := p.store.GetAndDelete(peerID)
if found && peer.height == p.maxHeight {
p.updateMaxHeight()
}
return PeerData{}, found
return peer, found
}

// Put adds the peer data to the store if the peer does not exist, otherwise update the current value
func (p *InMemPeerStore) Put(newPeer PeerData) {
func (p *InMemPeerStore) Put(peerID types.NodeID, newPeer PeerData) {
p.store.Put(peerID, newPeer)
p.mtx.Lock()
defer p.mtx.Unlock()
_, ok := p.get(newPeer.peerID)
if !ok {
p.peers = append(p.peers, &newPeer)
p.peerIDx[newPeer.peerID] = len(p.peers) - 1
p.maxHeight = max(p.maxHeight, newPeer.height)
return
}
p.update(newPeer)
p.maxHeight = max(p.maxHeight, newPeer.height)
}

// Remove removes the peer data from the store
func (p *InMemPeerStore) Remove(peerID types.NodeID) {
// Delete deletes the peer data from the store
func (p *InMemPeerStore) Delete(peerID types.NodeID) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.remove(peerID)
peer, found := p.store.GetAndDelete(peerID)
if !found {
return
}
if peer.height == p.maxHeight {
p.updateMaxHeight()
}
}

// MaxHeight looks at all the peers in the store to get the maximum peer height.
Expand All @@ -100,24 +89,21 @@ func (p *InMemPeerStore) MaxHeight() int64 {
return p.maxHeight
}

// PeerUpdate applies update functions to the peer if it exists
func (p *InMemPeerStore) PeerUpdate(peerID types.NodeID, updates ...PeerUpdateFunc) {
p.mtx.Lock()
defer p.mtx.Unlock()
peer, found := p.get(peerID)
// Update applies update functions to the peer if it exists
func (p *InMemPeerStore) Update(peerID types.NodeID, updates ...store.UpdateFunc[types.NodeID, PeerData]) {
p.store.Update(peerID, updates...)
peer, found := p.store.Get(peerID)
if !found {
return
}
for _, update := range updates {
update(peer)
}
p.mtx.Lock()
defer p.mtx.Unlock()
p.maxHeight = max(p.maxHeight, peer.height)
}

// Query finds and returns the copy of peers by specification conditions
func (p *InMemPeerStore) Query(spec PeerQueryFunc, limit int) []*PeerData {
p.mtx.RLock()
defer p.mtx.RUnlock()
return p.query(spec, limit)
func (p *InMemPeerStore) Query(spec store.QueryFunc[types.NodeID, PeerData], limit int) []PeerData {
return p.store.Query(spec, limit)
}

// FindPeer finds a peer for the request
Expand All @@ -126,7 +112,7 @@ func (p *InMemPeerStore) Query(spec PeerQueryFunc, limit int) []*PeerData {
// 2. the height must be between two values base and height
// otherwise return the empty peer data and false
func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool) {
spec := andX(
spec := store.AndX(
peerNumPendingCond(maxPendingRequestsPerPeer, "<"),
heightBetweenPeerHeightRange(height),
ignoreTimedOutPeers(minRecvRate),
Expand All @@ -135,101 +121,42 @@ func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool) {
if len(peers) == 0 {
return PeerData{}, false
}
return *peers[0], true
return peers[0], true
}

// FindTimedoutPeers finds and returns the timed out peers
func (p *InMemPeerStore) FindTimedoutPeers() []*PeerData {
return p.Query(andX(
func (p *InMemPeerStore) FindTimedoutPeers() []PeerData {
return p.Query(store.AndX(
peerNumPendingCond(0, ">"),
transferRateNotZeroAndLessMinRate(minRecvRate),
), 0)
}

// All returns all stored peers in the store
func (p *InMemPeerStore) All() []PeerData {
p.mtx.RLock()
defer p.mtx.RUnlock()
ret := make([]PeerData, len(p.peers))
for i, peer := range p.peers {
ret[i] = *peer
}
return ret
return p.store.All()
}

// Len returns the count of all stored peers
func (p *InMemPeerStore) Len() int {
p.mtx.RLock()
defer p.mtx.RUnlock()
return len(p.peers)
return p.store.Len()
}

// IsZero returns true if the store doesn't have a peer yet otherwise false
func (p *InMemPeerStore) IsZero() bool {
return p.Len() == 0
}

func (p *InMemPeerStore) get(peerID types.NodeID) (*PeerData, bool) {
i, ok := p.peerIDx[peerID]
if !ok {
return nil, false
}
return p.peers[i], true
}

func (p *InMemPeerStore) update(peer PeerData) {
i, ok := p.peerIDx[peer.peerID]
if !ok {
return
}
p.peers[i].height = peer.height
p.peers[i].base = peer.base
p.maxHeight = max(p.maxHeight, peer.height)
}

func (p *InMemPeerStore) remove(peerID types.NodeID) {
i, ok := p.peerIDx[peerID]
if !ok {
return
}
peer := p.peers[i]
right := p.peers[i+1:]
for j, peer := range right {
p.peerIDx[peer.peerID] = i + j
}
left := p.peers[0:i]
p.peers = append(left, right...)
delete(p.peerIDx, peerID)
if peer.height == p.maxHeight {
p.updateMaxHeight()
}
}

func (p *InMemPeerStore) query(spec PeerQueryFunc, limit int) []*PeerData {
var res []*PeerData
for _, i := range p.peerIDx {
peer := p.peers[i]
if spec(*peer) {
c := *peer
res = append(res, &c)
if limit > 0 && limit == len(res) {
return res
}
}
}
return res
return p.store.IsZero()
}

func (p *InMemPeerStore) updateMaxHeight() {
p.maxHeight = 0
for _, peer := range p.peers {
for _, peer := range p.store.All() {
p.maxHeight = max(p.maxHeight, peer.height)
}
}

// TODO with fixed worker pool size this condition is not needed anymore
func peerNumPendingCond(val int32, op string) PeerQueryFunc {
return func(peer PeerData) bool {
func peerNumPendingCond(val int32, op string) store.QueryFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer PeerData) bool {
switch op {
case "<":
return peer.numPending < val
Expand All @@ -240,21 +167,21 @@ func peerNumPendingCond(val int32, op string) PeerQueryFunc {
}
}

func heightBetweenPeerHeightRange(height int64) PeerQueryFunc {
return func(peer PeerData) bool {
func heightBetweenPeerHeightRange(height int64) store.QueryFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer PeerData) bool {
return height >= peer.base && height <= peer.height
}
}

func transferRateNotZeroAndLessMinRate(minRate int64) PeerQueryFunc {
return func(peer PeerData) bool {
func transferRateNotZeroAndLessMinRate(minRate int64) store.QueryFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer PeerData) bool {
curRate := peer.recvMonitor.CurrentTransferRate()
return curRate != 0 && curRate < minRate
}
}

func ignoreTimedOutPeers(minRate int64) PeerQueryFunc {
return func(peer PeerData) bool {
func ignoreTimedOutPeers(minRate int64) store.QueryFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer PeerData) bool {
curRate := peer.recvMonitor.CurrentTransferRate()
if curRate == 0 {
return true
Expand All @@ -263,17 +190,6 @@ func ignoreTimedOutPeers(minRate int64) PeerQueryFunc {
}
}

func andX(specs ...PeerQueryFunc) PeerQueryFunc {
return func(peer PeerData) bool {
for _, spec := range specs {
if !spec(peer) {
return false
}
}
return true
}
}

func newPeerData(peerID types.NodeID, base, height int64) PeerData {
startAt := time.Now()
return PeerData{
Expand All @@ -292,24 +208,24 @@ func newPeerMonitor(at time.Time) *flowrate.Monitor {
}

// AddNumPending adds a value to the numPending field
func AddNumPending(val int32) PeerUpdateFunc {
return func(peer *PeerData) {
func AddNumPending(val int32) store.UpdateFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer *PeerData) {
peer.numPending += val
}
}

// UpdateMonitor adds a block size value to the peer monitor if numPending is greater than zero
func UpdateMonitor(recvSize int) PeerUpdateFunc {
return func(peer *PeerData) {
func UpdateMonitor(recvSize int) store.UpdateFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer *PeerData) {
if peer.numPending > 0 {
peer.recvMonitor.Update(recvSize)
}
}
}

// ResetMonitor replaces a peer monitor on a new one if numPending is zero
func ResetMonitor() PeerUpdateFunc {
return func(peer *PeerData) {
func ResetMonitor() store.UpdateFunc[types.NodeID, PeerData] {
return func(peerID types.NodeID, peer *PeerData) {
if peer.numPending == 0 {
peer.recvMonitor = newPeerMonitor(peer.startAt)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/blocksync/peer_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ func TestInMemPeerStoreBasicOperations(t *testing.T) {
require.False(t, found)

// add a peer to store
inmem.Put(peer)
inmem.Put(peer.peerID, peer)
foundPeer, found := inmem.Get(peerID)
require.True(t, found)
require.Equal(t, peer, foundPeer)

// update a peer data
updatedPeer := newPeerData(peerID, 100, 200)
inmem.Put(updatedPeer)
inmem.Put(updatedPeer.peerID, updatedPeer)
foundPeer, found = inmem.Get(peerID)
require.True(t, found)
require.Equal(t, updatedPeer.height, foundPeer.height)
require.Equal(t, updatedPeer.base, foundPeer.base)

inmem.PeerUpdate(peerID, AddNumPending(1))
inmem.Update(peerID, AddNumPending(1))
require.Equal(t, int32(0), foundPeer.numPending)
foundPeer, found = inmem.Get(peerID)
require.True(t, found)
Expand All @@ -42,7 +42,7 @@ func TestInMemPeerStoreBasicOperations(t *testing.T) {
require.Equal(t, 1, inmem.Len())
require.False(t, inmem.IsZero())

inmem.Remove(peerID)
inmem.Delete(peerID)
require.Equal(t, 0, inmem.Len())
require.True(t, inmem.IsZero())
}
Expand Down
Loading

0 comments on commit d90e058

Please sign in to comment.