diff --git a/internal/blocksync/block_fetch_job.go b/internal/blocksync/block_fetch_job.go index 7c9a455597..fbab35034f 100644 --- a/internal/blocksync/block_fetch_job.go +++ b/internal/blocksync/block_fetch_job.go @@ -96,7 +96,7 @@ func (p *jobGenerator) nextJob(ctx context.Context) (*workerpool.Job, error) { if err != nil { return nil, err } - p.peerStore.PeerUpdate(peer.peerID, ResetMonitor(), AddNumPending(1)) + p.peerStore.Update(peer.peerID, ResetMonitor(), AddNumPending(1)) return workerpool.NewJob(blockFetchJobHandler(p.client, peer, height)), nil } diff --git a/internal/blocksync/block_fetch_job_test.go b/internal/blocksync/block_fetch_job_test.go index 123293ebd6..8ae9a76a15 100644 --- a/internal/blocksync/block_fetch_job_test.go +++ b/internal/blocksync/block_fetch_job_test.go @@ -102,7 +102,7 @@ func (suite *BlockFetchJobTestSuite) TestJobGeneratorNextJob() { logger := log.NewNopLogger() peerStore := NewInMemPeerStore() - peerStore.Put(suite.peer) + peerStore.Put(suite.peer.peerID, suite.peer) jobGen := newJobGenerator(5, logger, suite.client, peerStore) job, err := jobGen.nextJob(ctx) @@ -136,7 +136,7 @@ func (suite *BlockFetchJobTestSuite) TestGeneratorNextJobWaitForPeerAndPushBackH }() nextJobCh <- struct{}{} jobGen.pushBack(9) - peerStore.Put(suite.peer) + peerStore.Put(suite.peer.peerID, suite.peer) nextJobCh <- struct{}{} heightCheck := mock.MatchedBy(func(height int64) bool { return suite.Contains([]int64{5, 9}, height) diff --git a/internal/blocksync/peer_store.go b/internal/blocksync/peer_store.go index 750352ffcc..6bdb246141 100644 --- a/internal/blocksync/peer_store.go +++ b/internal/blocksync/peer_store.go @@ -8,19 +8,15 @@ import ( "golang.org/x/exp/constraints" "github.com/tendermint/tendermint/internal/libs/flowrate" + "github.com/tendermint/tendermint/libs/store" "github.com/tendermint/tendermint/types" ) type ( - // PeerQueryFunc is a function type for peer specification function - PeerQueryFunc func(peer PeerData) bool - // PeerUpdateFunc is a function type for peer update functions - PeerUpdateFunc func(peer *PeerData) // InMemPeerStore in-memory peer store InMemPeerStore struct { mtx sync.RWMutex - peerIDx map[types.NodeID]int - peers []*PeerData + store store.Store[types.NodeID, PeerData] maxHeight int64 } // PeerData uses to keep peer related data like base height and the current height etc @@ -41,56 +37,49 @@ const ( // NewInMemPeerStore creates a new in-memory peer store func NewInMemPeerStore(peers ...PeerData) *InMemPeerStore { mem := &InMemPeerStore{ - peerIDx: make(map[types.NodeID]int), + store: store.NewInMemStore[types.NodeID, PeerData](), } for _, peer := range peers { - mem.Put(peer) + mem.Put(peer.peerID, peer) } return mem } // Get returns peer's data and true if the peer is found otherwise empty structure and false func (p *InMemPeerStore) Get(peerID types.NodeID) (PeerData, bool) { - p.mtx.RLock() - defer p.mtx.RUnlock() - peer, found := p.get(peerID) - if found { - return *peer, true - } - return PeerData{}, false + return p.store.Get(peerID) } -// GetAndRemove combines Get operation and Remove in one call -func (p *InMemPeerStore) GetAndRemove(peerID types.NodeID) (PeerData, bool) { +// GetAndDelete combines Get operation and Delete in one call +func (p *InMemPeerStore) GetAndDelete(peerID types.NodeID) (PeerData, bool) { p.mtx.Lock() defer p.mtx.Unlock() - peer, found := p.get(peerID) - if found { - p.remove(peerID) - return *peer, true + peer, found := p.store.GetAndDelete(peerID) + if found && peer.height == p.maxHeight { + p.updateMaxHeight() } - return PeerData{}, found + return peer, found } // Put adds the peer data to the store if the peer does not exist, otherwise update the current value -func (p *InMemPeerStore) Put(newPeer PeerData) { +func (p *InMemPeerStore) Put(peerID types.NodeID, newPeer PeerData) { + p.store.Put(peerID, newPeer) p.mtx.Lock() defer p.mtx.Unlock() - _, ok := p.get(newPeer.peerID) - if !ok { - p.peers = append(p.peers, &newPeer) - p.peerIDx[newPeer.peerID] = len(p.peers) - 1 - p.maxHeight = max(p.maxHeight, newPeer.height) - return - } - p.update(newPeer) + p.maxHeight = max(p.maxHeight, newPeer.height) } -// Remove removes the peer data from the store -func (p *InMemPeerStore) Remove(peerID types.NodeID) { +// Delete deletes the peer data from the store +func (p *InMemPeerStore) Delete(peerID types.NodeID) { p.mtx.Lock() defer p.mtx.Unlock() - p.remove(peerID) + peer, found := p.store.GetAndDelete(peerID) + if !found { + return + } + if peer.height == p.maxHeight { + p.updateMaxHeight() + } } // MaxHeight looks at all the peers in the store to get the maximum peer height. @@ -100,24 +89,21 @@ func (p *InMemPeerStore) MaxHeight() int64 { return p.maxHeight } -// PeerUpdate applies update functions to the peer if it exists -func (p *InMemPeerStore) PeerUpdate(peerID types.NodeID, updates ...PeerUpdateFunc) { - p.mtx.Lock() - defer p.mtx.Unlock() - peer, found := p.get(peerID) +// Update applies update functions to the peer if it exists +func (p *InMemPeerStore) Update(peerID types.NodeID, updates ...store.UpdateFunc[types.NodeID, PeerData]) { + p.store.Update(peerID, updates...) + peer, found := p.store.Get(peerID) if !found { return } - for _, update := range updates { - update(peer) - } + p.mtx.Lock() + defer p.mtx.Unlock() + p.maxHeight = max(p.maxHeight, peer.height) } // Query finds and returns the copy of peers by specification conditions -func (p *InMemPeerStore) Query(spec PeerQueryFunc, limit int) []*PeerData { - p.mtx.RLock() - defer p.mtx.RUnlock() - return p.query(spec, limit) +func (p *InMemPeerStore) Query(spec store.QueryFunc[types.NodeID, PeerData], limit int) []PeerData { + return p.store.Query(spec, limit) } // FindPeer finds a peer for the request @@ -126,7 +112,7 @@ func (p *InMemPeerStore) Query(spec PeerQueryFunc, limit int) []*PeerData { // 2. the height must be between two values base and height // otherwise return the empty peer data and false func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool) { - spec := andX( + spec := store.AndX( peerNumPendingCond(maxPendingRequestsPerPeer, "<"), heightBetweenPeerHeightRange(height), ignoreTimedOutPeers(minRecvRate), @@ -135,12 +121,12 @@ func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool) { if len(peers) == 0 { return PeerData{}, false } - return *peers[0], true + return peers[0], true } // FindTimedoutPeers finds and returns the timed out peers -func (p *InMemPeerStore) FindTimedoutPeers() []*PeerData { - return p.Query(andX( +func (p *InMemPeerStore) FindTimedoutPeers() []PeerData { + return p.Query(store.AndX( peerNumPendingCond(0, ">"), transferRateNotZeroAndLessMinRate(minRecvRate), ), 0) @@ -148,88 +134,29 @@ func (p *InMemPeerStore) FindTimedoutPeers() []*PeerData { // All returns all stored peers in the store func (p *InMemPeerStore) All() []PeerData { - p.mtx.RLock() - defer p.mtx.RUnlock() - ret := make([]PeerData, len(p.peers)) - for i, peer := range p.peers { - ret[i] = *peer - } - return ret + return p.store.All() } // Len returns the count of all stored peers func (p *InMemPeerStore) Len() int { - p.mtx.RLock() - defer p.mtx.RUnlock() - return len(p.peers) + return p.store.Len() } // IsZero returns true if the store doesn't have a peer yet otherwise false func (p *InMemPeerStore) IsZero() bool { - return p.Len() == 0 -} - -func (p *InMemPeerStore) get(peerID types.NodeID) (*PeerData, bool) { - i, ok := p.peerIDx[peerID] - if !ok { - return nil, false - } - return p.peers[i], true -} - -func (p *InMemPeerStore) update(peer PeerData) { - i, ok := p.peerIDx[peer.peerID] - if !ok { - return - } - p.peers[i].height = peer.height - p.peers[i].base = peer.base - p.maxHeight = max(p.maxHeight, peer.height) -} - -func (p *InMemPeerStore) remove(peerID types.NodeID) { - i, ok := p.peerIDx[peerID] - if !ok { - return - } - peer := p.peers[i] - right := p.peers[i+1:] - for j, peer := range right { - p.peerIDx[peer.peerID] = i + j - } - left := p.peers[0:i] - p.peers = append(left, right...) - delete(p.peerIDx, peerID) - if peer.height == p.maxHeight { - p.updateMaxHeight() - } -} - -func (p *InMemPeerStore) query(spec PeerQueryFunc, limit int) []*PeerData { - var res []*PeerData - for _, i := range p.peerIDx { - peer := p.peers[i] - if spec(*peer) { - c := *peer - res = append(res, &c) - if limit > 0 && limit == len(res) { - return res - } - } - } - return res + return p.store.IsZero() } func (p *InMemPeerStore) updateMaxHeight() { p.maxHeight = 0 - for _, peer := range p.peers { + for _, peer := range p.store.All() { p.maxHeight = max(p.maxHeight, peer.height) } } // TODO with fixed worker pool size this condition is not needed anymore -func peerNumPendingCond(val int32, op string) PeerQueryFunc { - return func(peer PeerData) bool { +func peerNumPendingCond(val int32, op string) store.QueryFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer PeerData) bool { switch op { case "<": return peer.numPending < val @@ -240,21 +167,21 @@ func peerNumPendingCond(val int32, op string) PeerQueryFunc { } } -func heightBetweenPeerHeightRange(height int64) PeerQueryFunc { - return func(peer PeerData) bool { +func heightBetweenPeerHeightRange(height int64) store.QueryFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer PeerData) bool { return height >= peer.base && height <= peer.height } } -func transferRateNotZeroAndLessMinRate(minRate int64) PeerQueryFunc { - return func(peer PeerData) bool { +func transferRateNotZeroAndLessMinRate(minRate int64) store.QueryFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer PeerData) bool { curRate := peer.recvMonitor.CurrentTransferRate() return curRate != 0 && curRate < minRate } } -func ignoreTimedOutPeers(minRate int64) PeerQueryFunc { - return func(peer PeerData) bool { +func ignoreTimedOutPeers(minRate int64) store.QueryFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer PeerData) bool { curRate := peer.recvMonitor.CurrentTransferRate() if curRate == 0 { return true @@ -263,17 +190,6 @@ func ignoreTimedOutPeers(minRate int64) PeerQueryFunc { } } -func andX(specs ...PeerQueryFunc) PeerQueryFunc { - return func(peer PeerData) bool { - for _, spec := range specs { - if !spec(peer) { - return false - } - } - return true - } -} - func newPeerData(peerID types.NodeID, base, height int64) PeerData { startAt := time.Now() return PeerData{ @@ -292,15 +208,15 @@ func newPeerMonitor(at time.Time) *flowrate.Monitor { } // AddNumPending adds a value to the numPending field -func AddNumPending(val int32) PeerUpdateFunc { - return func(peer *PeerData) { +func AddNumPending(val int32) store.UpdateFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer *PeerData) { peer.numPending += val } } // UpdateMonitor adds a block size value to the peer monitor if numPending is greater than zero -func UpdateMonitor(recvSize int) PeerUpdateFunc { - return func(peer *PeerData) { +func UpdateMonitor(recvSize int) store.UpdateFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer *PeerData) { if peer.numPending > 0 { peer.recvMonitor.Update(recvSize) } @@ -308,8 +224,8 @@ func UpdateMonitor(recvSize int) PeerUpdateFunc { } // ResetMonitor replaces a peer monitor on a new one if numPending is zero -func ResetMonitor() PeerUpdateFunc { - return func(peer *PeerData) { +func ResetMonitor() store.UpdateFunc[types.NodeID, PeerData] { + return func(peerID types.NodeID, peer *PeerData) { if peer.numPending == 0 { peer.recvMonitor = newPeerMonitor(peer.startAt) } diff --git a/internal/blocksync/peer_store_test.go b/internal/blocksync/peer_store_test.go index d662ce9a0b..9251d17e8f 100644 --- a/internal/blocksync/peer_store_test.go +++ b/internal/blocksync/peer_store_test.go @@ -20,20 +20,20 @@ func TestInMemPeerStoreBasicOperations(t *testing.T) { require.False(t, found) // add a peer to store - inmem.Put(peer) + inmem.Put(peer.peerID, peer) foundPeer, found := inmem.Get(peerID) require.True(t, found) require.Equal(t, peer, foundPeer) // update a peer data updatedPeer := newPeerData(peerID, 100, 200) - inmem.Put(updatedPeer) + inmem.Put(updatedPeer.peerID, updatedPeer) foundPeer, found = inmem.Get(peerID) require.True(t, found) require.Equal(t, updatedPeer.height, foundPeer.height) require.Equal(t, updatedPeer.base, foundPeer.base) - inmem.PeerUpdate(peerID, AddNumPending(1)) + inmem.Update(peerID, AddNumPending(1)) require.Equal(t, int32(0), foundPeer.numPending) foundPeer, found = inmem.Get(peerID) require.True(t, found) @@ -42,7 +42,7 @@ func TestInMemPeerStoreBasicOperations(t *testing.T) { require.Equal(t, 1, inmem.Len()) require.False(t, inmem.IsZero()) - inmem.Remove(peerID) + inmem.Delete(peerID) require.Equal(t, 0, inmem.Len()) require.True(t, inmem.IsZero()) } diff --git a/internal/blocksync/synchronizer.go b/internal/blocksync/synchronizer.go index 442bcbb279..f10c4716f8 100644 --- a/internal/blocksync/synchronizer.go +++ b/internal/blocksync/synchronizer.go @@ -188,7 +188,7 @@ func (s *Synchronizer) consumeJobResult(ctx context.Context) { return } resp := res.Value.(*BlockResponse) - s.peerStore.PeerUpdate(resp.PeerID, AddNumPending(-1), UpdateMonitor(resp.Block.Size())) + s.peerStore.Update(resp.PeerID, AddNumPending(-1), UpdateMonitor(resp.Block.Size())) err = s.addBlock(*resp) if err != nil { s.logger.Error("cannot add a block to the pending list", @@ -266,7 +266,7 @@ func (s *Synchronizer) LastAdvance() time.Time { // AddPeer adds the peer's alleged blockchain base and height func (s *Synchronizer) AddPeer(peer PeerData) { - s.peerStore.Put(peer) + s.peerStore.Put(peer.peerID, peer) } // RemovePeer removes the peer with peerID from the synchronizer. If there's no peer @@ -283,7 +283,7 @@ func (s *Synchronizer) removePeer(peerID types.NodeID) { s.jobGen.pushBack(resp.Block.Height) } } - s.peerStore.Remove(peerID) + s.peerStore.Delete(peerID) } func (s *Synchronizer) applyBlock(ctx context.Context) error { diff --git a/libs/store/mocks/store.go b/libs/store/mocks/store.go index dea945aed5..50722fe4ba 100644 --- a/libs/store/mocks/store.go +++ b/libs/store/mocks/store.go @@ -81,21 +81,49 @@ func (_m *Store[K, V]) GetAndDelete(key K) (V, bool) { return r0, r1 } +// IsZero provides a mock function with given fields: +func (_m *Store[K, V]) IsZero() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Len provides a mock function with given fields: +func (_m *Store[K, V]) Len() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + // Put provides a mock function with given fields: key, data func (_m *Store[K, V]) Put(key K, data V) { _m.Called(key, data) } // Query provides a mock function with given fields: spec, limit -func (_m *Store[K, V]) Query(spec store.QueryFunc[K, V], limit int) []*V { +func (_m *Store[K, V]) Query(spec store.QueryFunc[K, V], limit int) []V { ret := _m.Called(spec, limit) - var r0 []*V - if rf, ok := ret.Get(0).(func(store.QueryFunc[K, V], int) []*V); ok { + var r0 []V + if rf, ok := ret.Get(0).(func(store.QueryFunc[K, V], int) []V); ok { r0 = rf(spec, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*V) + r0 = ret.Get(0).([]V) } } diff --git a/libs/store/store.go b/libs/store/store.go index ed086937ff..c3f7f08bb8 100644 --- a/libs/store/store.go +++ b/libs/store/store.go @@ -14,8 +14,10 @@ type ( Put(key K, data V) Delete(key K) Update(key K, updates ...UpdateFunc[K, V]) - Query(spec QueryFunc[K, V], limit int) []*V + Query(spec QueryFunc[K, V], limit int) []V All() []V + Len() int + IsZero() bool } // QueryFunc is a function type for a specification function QueryFunc[K comparable, V any] func(key K, data V) bool @@ -124,3 +126,15 @@ func (p *InMemStore[K, T]) query(spec QueryFunc[K, T], limit int) []T { } return res } + +// AndX combines multiple specification functions into one +func AndX[K comparable, V any](specs ...QueryFunc[K, V]) QueryFunc[K, V] { + return func(k K, v V) bool { + for _, spec := range specs { + if !spec(k, v) { + return false + } + } + return true + } +}