Skip to content

Commit

Permalink
Timeout contexts
Browse files Browse the repository at this point in the history
Smart contexts that listen for timeouts
  • Loading branch information
johningve committed Aug 19, 2022
1 parent 4c52587 commit 353f603
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 22 deletions.
9 changes: 5 additions & 4 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Replica) Vote(cert hotstuff.PartialCert) {
if r.node == nil {
return
}
ctx, cancel := synchronizer.ViewContext(context.Background(), nil, r.eventLoop)
ctx, cancel := synchronizer.TimeoutContext(r.eventLoop.Context(), r.eventLoop)
defer cancel()
pCert := hotstuffpb.PartialCertToProto(cert)
r.node.Vote(ctx, pCert)
Expand All @@ -57,7 +57,7 @@ func (r *Replica) NewView(msg hotstuff.SyncInfo) {
if r.node == nil {
return
}
ctx, cancel := synchronizer.ViewContext(context.Background(), nil, r.eventLoop)
ctx, cancel := synchronizer.TimeoutContext(r.eventLoop.Context(), r.eventLoop)
defer cancel()
r.node.NewView(ctx, hotstuffpb.SyncInfoToProto(msg))
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func (cfg *subConfig) Propose(proposal hotstuff.ProposeMsg) {
if cfg.cfg == nil {
return
}
ctx, cancel := synchronizer.ViewContext(context.Background(), nil, cfg.eventLoop)
ctx, cancel := synchronizer.TimeoutContext(cfg.eventLoop.Context(), cfg.eventLoop)
defer cancel()
cfg.cfg.Propose(
ctx,
Expand All @@ -302,7 +302,8 @@ func (cfg *subConfig) Timeout(msg hotstuff.TimeoutMsg) {
return
}

ctx, cancel := synchronizer.ViewContext(context.Background(), nil, cfg.eventLoop)
// will wait until the second timeout before cancelling
ctx, cancel := synchronizer.TimeoutContext(cfg.eventLoop.Context(), cfg.eventLoop)
defer cancel()

cfg.cfg.Timeout(
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (chain *blockChain) Get(hash hotstuff.Hash) (block *hotstuff.Block, ok bool
goto done
}

ctx, cancel = synchronizer.ViewContext(context.Background(), nil, chain.eventLoop)
ctx, cancel = synchronizer.TimeoutContext(chain.eventLoop.Context(), chain.eventLoop)
chain.pendingFetch[hash] = cancel

chain.mut.Unlock()
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package consensus

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -126,7 +125,7 @@ func (cs *consensusBase) Propose(cert hotstuff.SyncInfo) {
}
}

ctx, cancel := synchronizer.ViewContext(context.Background(), nil, cs.eventLoop)
ctx, cancel := synchronizer.TimeoutContext(cs.eventLoop.Context(), cs.eventLoop)
defer cancel()

cmd, ok := cs.commandQueue.Get(ctx)
Expand Down
14 changes: 10 additions & 4 deletions eventloop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ type handler struct {
type EventLoop struct {
modules.Implements[*EventLoop]

mut sync.Mutex
eventQ queue

ctx context.Context
mut sync.Mutex // protects the following:

ctx context.Context // set by Run

eventQ queue
waitingEvents map[reflect.Type][]any

handlers map[reflect.Type][]handler
Expand All @@ -70,6 +71,7 @@ type EventLoop struct {
// New returns a new event loop with the requested buffer size.
func New(bufferSize uint) *EventLoop {
el := &EventLoop{
ctx: context.Background(),
eventQ: newQueue(bufferSize),
waitingEvents: make(map[reflect.Type][]any),
handlers: make(map[reflect.Type][]handler),
Expand Down Expand Up @@ -174,6 +176,8 @@ func (el *EventLoop) setContext(ctx context.Context) {

// Run runs the event loop. A context object can be provided to stop the event loop.
func (el *EventLoop) Run(ctx context.Context) {
el.setContext(ctx)

loop:
for {
event, ok := el.eventQ.pop()
Expand Down Expand Up @@ -201,7 +205,9 @@ loop:
}

// Tick processes a single event. Returns true if an event was handled.
func (el *EventLoop) Tick() bool {
func (el *EventLoop) Tick(ctx context.Context) bool {
el.setContext(ctx)

event, ok := el.eventQ.pop()
if !ok {
return false
Expand Down
30 changes: 30 additions & 0 deletions eventloop/eventloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,33 @@ func TestDelayedEvent(t *testing.T) {
}
}
}

func BenchmarkEventLoopWithObservers(b *testing.B) {
el := eventloop.New(100)

for i := 0; i < 100; i++ {
el.RegisterObserver(testEvent(0), func(event any) {
if event.(testEvent) != 1 {
panic("Unexpected value observed")
}
})
}

for i := 0; i < b.N; i++ {
el.AddEvent(testEvent(1))
el.Tick(context.Background())
}
}

func BenchmarkDelay(b *testing.B) {
el := eventloop.New(100)

for i := 0; i < b.N; i++ {
el.DelayUntil(testEvent(0), testEvent(2))
el.DelayUntil(testEvent(0), testEvent(3))
el.AddEvent(testEvent(1))
el.Tick(context.Background())
el.Tick(context.Background())
el.Tick(context.Background())
}
}
6 changes: 2 additions & 4 deletions handel/handel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
package handel

import (
"context"
"math"

"github.com/relab/gorums"
Expand Down Expand Up @@ -104,7 +103,7 @@ func (h *Handel) InitModule(mods *modules.Core) {
})

h.eventLoop.RegisterHandler(disseminateEvent{}, func(e any) {
ctx, cancel := synchronizer.ViewContext(context.Background(), nil, h.eventLoop)
ctx, cancel := synchronizer.ViewContext(h.eventLoop.Context(), h.eventLoop, nil)
defer cancel()
if s, ok := h.sessions[e.(disseminateEvent).sessionID]; ok {
s.sendContributions(ctx)
Expand Down Expand Up @@ -143,8 +142,7 @@ func (h *Handel) Begin(s hotstuff.PartialCert) {
session := h.newSession(s.BlockHash(), s.Signature())
h.sessions[s.BlockHash()] = session

ctx, _ := synchronizer.ViewContext(context.Background(), nil, h.eventLoop)
go session.verifyContributions(ctx)
go session.verifyContributions()
}

type serviceImpl struct {
Expand Down
7 changes: 5 additions & 2 deletions handel/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (s *session) updateIncoming(c contribution) {
level.done = true
s.advanceLevel()
if c.level+1 <= s.h.maxLevel {
ctx, cancel := synchronizer.ViewContext(context.Background(), nil, s.h.eventLoop)
ctx, cancel := synchronizer.ViewContext(s.h.eventLoop.Context(), s.h.eventLoop, nil)
defer cancel()
s.sendFastPath(ctx, c.level+1)
}
Expand Down Expand Up @@ -475,7 +475,10 @@ func (s *session) sendContributionToLevel(ctx context.Context, levelIndex int) {
level.cp[id] += len(s.part.ids)
}

func (s *session) verifyContributions(ctx context.Context) {
func (s *session) verifyContributions() {
ctx, cancel := synchronizer.ViewContext(s.h.eventLoop.Context(), s.h.eventLoop, nil)
defer cancel()

for ctx.Err() == nil {
c, verifyIndiv, ok := s.chooseContribution()
if !ok {
Expand Down
27 changes: 23 additions & 4 deletions synchronizer/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,38 @@ import (
"github.com/relab/hotstuff/eventloop"
)

// This file provides several functions for creating contexts with lifespans that are tied to synchronizer events.

// ViewContext returns a context that is cancelled at the end of a view.
// If view is nil or less than or equal to the current view, the context will be cancelled at the next view change.
func ViewContext(parent context.Context, view *hotstuff.View, eventLoop *eventloop.EventLoop) (context.Context, context.CancelFunc) {
//
// ViewContext should probably not be used for operations running on the event loop, because
func ViewContext(parent context.Context, eventLoop *eventloop.EventLoop, view *hotstuff.View) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(parent)

id := eventLoop.RegisterObserver(ViewChangeEvent{}, func(event any) {
id := eventLoop.RegisterHandler(ViewChangeEvent{}, func(event any) {
if view == nil || event.(ViewChangeEvent).View >= *view {
cancel()
}
})
}, eventloop.RunAsync(), eventloop.WithPriority())

return ctx, func() {
eventLoop.UnregisterHandler(ViewChangeEvent{}, id)
cancel()
}
}

// TimeoutContext returns a context that is cancelled either when a timeout occurs, or when the view changes.
func TimeoutContext(parent context.Context, eventLoop *eventloop.EventLoop) (context.Context, context.CancelFunc) {
// ViewContext handles view-change case.
ctx, cancel := ViewContext(parent, eventLoop, nil)

id := eventLoop.RegisterHandler(TimeoutEvent{}, func(event any) {
cancel()
}, eventloop.RunAsync(), eventloop.WithPriority())

return ctx, func() {
eventLoop.UnregisterObserver(ViewChangeEvent{}, id)
eventLoop.UnregisterHandler(TimeoutEvent{}, id)
cancel()
}
}
2 changes: 1 addition & 1 deletion twins/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (n *Network) tick() {
for _, node := range n.nodes {
node.eventLoop.AddEvent(tick{})
// run each event loop as long as it has events
for node.eventLoop.Tick() {
for node.eventLoop.Tick(context.Background()) {
}
}
}
Expand Down

0 comments on commit 353f603

Please sign in to comment.