Skip to content

Commit

Permalink
clean vbft event timer (#1446)
Browse files Browse the repository at this point in the history
  • Loading branch information
laizy authored Jul 31, 2024
1 parent 4dacf0a commit 1f6a45a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 218 deletions.
157 changes: 30 additions & 127 deletions consensus/vbft/event_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package vbft

import (
"container/heap"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -79,17 +77,14 @@ type EventTimer struct {

// peer heartbeat tickers
peerTickers map[uint32]*time.Timer
// other timers
normalTimers map[uint32]*time.Timer
}

func NewEventTimer(server *Server) *EventTimer {
timer := &EventTimer{
server: server,
C: make(chan *TimerEvent, 64),
eventTimers: make(map[TimerEventType]perBlockTimer),
peerTickers: make(map[uint32]*time.Timer),
normalTimers: make(map[uint32]*time.Timer),
server: server,
C: make(chan *TimerEvent, 64),
eventTimers: make(map[TimerEventType]perBlockTimer),
peerTickers: make(map[uint32]*time.Timer),
}

for i := 0; i < int(EventMax); i++ {
Expand All @@ -114,42 +109,6 @@ func (self *EventTimer) stop() {
stopAllTimers(self.eventTimers[TimerEventType(i)])
self.eventTimers[TimerEventType(i)] = make(map[uint32]*time.Timer)
}

// clear normal timers
stopAllTimers(self.normalTimers)
self.normalTimers = make(map[uint32]*time.Timer)
}

func (self *EventTimer) StartTimer(Idx uint32, timeout time.Duration) {
self.lock.Lock()
defer self.lock.Unlock()

if t, present := self.normalTimers[Idx]; present {
t.Stop()
log.Infof("timer for %d got reset", Idx)
}

self.normalTimers[Idx] = time.AfterFunc(timeout, func() {
// remove timer from map
self.lock.Lock()
defer self.lock.Unlock()
delete(self.normalTimers, Idx)

self.C <- &TimerEvent{
evtType: EventMax,
blockNum: Idx,
}
})
}

func (self *EventTimer) CancelTimer(idx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

if t, present := self.normalTimers[idx]; present {
t.Stop()
delete(self.normalTimers, idx)
}
}

func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration {
Expand Down Expand Up @@ -180,15 +139,13 @@ func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration {
return time.Duration(txPooltimeout)
case EventTxBlockTimeout:
return time.Duration(atomic.LoadInt64(&zeroTxBlockTimeout))
default:
panic("unknown timer event type")
}

return 0
}

//
// internal helper, should call with lock held
//
func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) error {
func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) {
timers := self.eventTimers[evtType]
if t, present := timers[blockNum]; present {
t.Stop()
Expand All @@ -198,21 +155,18 @@ func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32)

timeout := self.getEventTimeout(evtType)
if timeout == 0 {
log.Errorf("invalid timeout for event %d, blkNum %d", evtType, blockNum)
return fmt.Errorf("invalid timeout for event %d, blkNum %d", evtType, blockNum)
// never happen when config correctly
timeout = time.Second
}
timers[blockNum] = time.AfterFunc(timeout, func() {
self.C <- &TimerEvent{
evtType: evtType,
blockNum: blockNum,
}
})
return nil
}

//
// internal helper, should call with lock held
//
func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32) {
timers := self.eventTimers[evtType]

Expand All @@ -222,12 +176,12 @@ func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32
}
}

func (self *EventTimer) StartProposalTimer(blockNum uint32) error {
func (self *EventTimer) StartProposalTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started proposal timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventProposeBlockTimeout, blockNum)
self.startEventTimer(EventProposeBlockTimeout, blockNum)
}

func (self *EventTimer) CancelProposalTimer(blockNum uint32) {
Expand All @@ -237,12 +191,12 @@ func (self *EventTimer) CancelProposalTimer(blockNum uint32) {
self.cancelEventTimer(EventProposeBlockTimeout, blockNum)
}

func (self *EventTimer) StartEndorsingTimer(blockNum uint32) error {
func (self *EventTimer) StartEndorsingTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started endorsing timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventEndorseBlockTimeout, blockNum)
self.startEventTimer(EventEndorseBlockTimeout, blockNum)
}

func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) {
Expand All @@ -252,12 +206,12 @@ func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) {
self.cancelEventTimer(EventEndorseBlockTimeout, blockNum)
}

func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) error {
func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started empty endorsing timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
}

func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) {
Expand All @@ -267,12 +221,12 @@ func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) {
self.cancelEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
}

func (self *EventTimer) StartCommitTimer(blockNum uint32) error {
func (self *EventTimer) StartCommitTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started commit timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventCommitBlockTimeout, blockNum)
self.startEventTimer(EventCommitBlockTimeout, blockNum)
}

func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) {
Expand All @@ -282,11 +236,11 @@ func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) {
self.cancelEventTimer(EventCommitBlockTimeout, blockNum)
}

func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) error {
func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventProposalBackoff, blockNum)
self.startEventTimer(EventProposalBackoff, blockNum)
}

func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) {
Expand All @@ -296,11 +250,11 @@ func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) {
self.cancelEventTimer(EventProposalBackoff, blockNum)
}

func (self *EventTimer) StartBackoffTimer(blockNum uint32) error {
func (self *EventTimer) StartBackoffTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventRandomBackoff, blockNum)
self.startEventTimer(EventRandomBackoff, blockNum)
}

func (self *EventTimer) CancelBackoffTimer(blockNum uint32) {
Expand All @@ -310,11 +264,11 @@ func (self *EventTimer) CancelBackoffTimer(blockNum uint32) {
self.cancelEventTimer(EventRandomBackoff, blockNum)
}

func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) error {
func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventPropose2ndBlockTimeout, blockNum)
self.startEventTimer(EventPropose2ndBlockTimeout, blockNum)
}

func (self *EventTimer) Cancel2ndProposalTimer(blockNum uint32) {
Expand All @@ -334,11 +288,11 @@ func (self *EventTimer) onBlockSealed(blockNum uint32) {
}
}

func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) error {
func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventTxBlockTimeout, blockNum)
self.startEventTimer(EventTxBlockTimeout, blockNum)
}

func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) {
Expand All @@ -348,7 +302,7 @@ func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) {
self.cancelEventTimer(EventTxBlockTimeout, blockNum)
}

func (self *EventTimer) startPeerTicker(peerIdx uint32) error {
func (self *EventTimer) startPeerTicker(peerIdx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

Expand All @@ -365,79 +319,28 @@ func (self *EventTimer) startPeerTicker(peerIdx uint32) error {
}
self.peerTickers[peerIdx].Reset(timeout)
})

return nil
}

func (self *EventTimer) stopPeerTicker(peerIdx uint32) error {
func (self *EventTimer) stopPeerTicker(peerIdx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

if p, present := self.peerTickers[peerIdx]; present {
p.Stop()
delete(self.peerTickers, peerIdx)
}
return nil
}

func (self *EventTimer) startTxTicker(blockNum uint32) error {
func (self *EventTimer) startTxPoolTicker(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventTxPool, blockNum)
self.startEventTimer(EventTxPool, blockNum)
}

func (self *EventTimer) stopTxTicker(blockNum uint32) {
func (self *EventTimer) stopTxPoolTicker(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

self.cancelEventTimer(EventTxPool, blockNum)
}

///////////////////////////////////////////////////////////
//
// timer queue
//
///////////////////////////////////////////////////////////

type TimerItem struct {
due time.Time
evt *TimerEvent
index int
}

type TimerQueue []*TimerItem

func (tq TimerQueue) Len() int {
return len(tq)
}

func (tq TimerQueue) Less(i, j int) bool {
return tq[j].due.After(tq[i].due)
}

func (tq TimerQueue) Swap(i, j int) {
tq[i], tq[j] = tq[j], tq[i]
tq[i].index = i
tq[j].index = j
}

func (tq *TimerQueue) Push(x interface{}) {
item := x.(*TimerItem)
item.index = len(*tq)
*tq = append(*tq, item)
}

func (tq *TimerQueue) Pop() interface{} {
old := *tq
n := len(old)
item := old[n-1]
item.index = -1
*tq = old[0 : n-1]
return item
}

func (tq *TimerQueue) update(item *TimerItem, due time.Time) {
item.due = due
heap.Fix(tq, item.index)
}
16 changes: 2 additions & 14 deletions consensus/vbft/event_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,13 @@ func constructEventTimer() *EventTimer {
return NewEventTimer(server)
}

func TestStartTimer(t *testing.T) {
eventtimer := constructEventTimer()
eventtimer.StartTimer(1, 10)
}

func TestCancelTimer(t *testing.T) {
eventtimer := constructEventTimer()
eventtimer.StartTimer(1, 10)
eventtimer.CancelTimer(1)
}
func TestStartEventTimer(t *testing.T) {
eventtimer := constructEventTimer()
err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
t.Logf("TestStartEventTimer: %v", err)
eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
}

func TestCancelEventTimer(t *testing.T) {
eventtimer := constructEventTimer()
err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
t.Logf("startEventTimer: %v", err)
eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
eventtimer.cancelEventTimer(EventProposeBlockTimeout, 1)
}
Loading

0 comments on commit 1f6a45a

Please sign in to comment.