Skip to content

Commit

Permalink
eventloop: more flexible handlers
Browse files Browse the repository at this point in the history
This commit aims to make the eventloop more flexible by allowing both
async and sync handlers again. Handlers can be made async by passing a
RunAsync handler option. Furthermore, observers have been replaced by a
WithPriority handler option.
  • Loading branch information
johningve committed Aug 19, 2022
1 parent 9d8ded3 commit 4c52587
Showing 1 changed file with 113 additions and 49 deletions.
162 changes: 113 additions & 49 deletions eventloop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,37 @@ import (
"github.com/relab/hotstuff/util/gpool"
)

type handlerOpts struct {
async bool
priority bool
}

// HandlerOption sets configuration options for event handlers.
type HandlerOption func(*handlerOpts)

// RunAsync instructs the eventloop to run the handler asynchronously.
func RunAsync() HandlerOption {
return func(ho *handlerOpts) {
ho.async = true
}
}

// WithPriority instructs the eventloop to prioritize running the handler before others.
// This guarantees that the handler runs before handlers that have not requested priority.
func WithPriority() HandlerOption {
return func(ho *handlerOpts) {
ho.priority = true
}
}

// EventHandler processes an event.
type EventHandler func(event any)

type handler struct {
callback EventHandler
opts handlerOpts
}

// EventLoop accepts events of any type and executes relevant event handlers.
// It supports registering both observers and handlers based on the type of event that they accept.
// The difference between them is that there can be many observers per event type, but only one handler,
Expand All @@ -28,11 +56,12 @@ type EventLoop struct {

mut sync.Mutex

ctx context.Context

eventQ queue
waitingEvents map[reflect.Type][]any

handlers map[reflect.Type]EventHandler
observers map[reflect.Type][]EventHandler
handlers map[reflect.Type][]handler

tickers map[int]*ticker
tickerID int
Expand All @@ -43,8 +72,7 @@ func New(bufferSize uint) *EventLoop {
el := &EventLoop{
eventQ: newQueue(bufferSize),
waitingEvents: make(map[reflect.Type][]any),
handlers: make(map[reflect.Type]EventHandler),
observers: make(map[reflect.Type][]EventHandler),
handlers: make(map[reflect.Type][]handler),
tickers: make(map[int]*ticker),
}
return el
Expand All @@ -64,63 +92,86 @@ func RegisterObserver[E any](eventloop *EventLoop, handler func(event E)) {
eventloop.RegisterObserver(zero, func(event any) { handler(event.(E)) })
}

// RegisterHandler registers a handler for events with the same type as the 'eventType' argument.
// There can be only one handler per event type, and the handler is executed after any observers.
func (el *EventLoop) RegisterHandler(eventType any, handler EventHandler) {
el.mut.Lock()
defer el.mut.Unlock()
el.handlers[reflect.TypeOf(eventType)] = handler
// RegisterObserver registers a handler with priority.
// Deprecated: use RegisterHandler and the WithPriority option instead.
func (el *EventLoop) RegisterObserver(eventType any, handler EventHandler) int {
return el.RegisterHandler(eventType, handler, WithPriority())
}

func (el *EventLoop) UnregisterHandler(eventType any) {
el.mut.Lock()
defer el.mut.Unlock()
delete(el.handlers, reflect.TypeOf(eventType))
// UnregisterObserver unregister a handler.
// Deprecated: use UnregisterHandler instead.
func (el *EventLoop) UnregisterObserver(eventType any, id int) {
el.UnregisterHandler(eventType, id)
}

// RegisterObserver registers an observer for events with the same type as the 'eventType' argument.
// The observers are executed before the handler.
//
// Returns an observer ID that can be used to remove the observer later.
func (el *EventLoop) RegisterObserver(eventType any, observer EventHandler) int {
// RegisterHandler registers an event handler. The handler will
func (el *EventLoop) RegisterHandler(eventType any, callback EventHandler, opts ...HandlerOption) int {
h := handler{callback: callback}

for _, opt := range opts {
opt(&h.opts)
}

el.mut.Lock()
defer el.mut.Unlock()
t := reflect.TypeOf(eventType)

observers := el.observers[t]
handlers := el.handlers[t]

i := 0
for ; i < len(observers); i++ {
if observers[i] == nil {
for ; i < len(handlers); i++ {
if handlers[i].callback == nil {
break
}
}

if i == len(observers) {
observers = append(observers, observer)
if i == len(handlers) {
handlers = append(handlers, h)
} else {
observers[i] = observer
handlers[i] = h
}

el.observers[t] = observers
el.handlers[t] = handlers

return i
}

func (el *EventLoop) UnregisterObserver(eventType any, id int) {
// UnregisterHandler unregisters the handler for the given event type with the given id.
func (el *EventLoop) UnregisterHandler(eventType any, id int) {
el.mut.Lock()
defer el.mut.Unlock()
t := reflect.TypeOf(eventType)
el.observers[t][id] = nil
el.handlers[t][id].callback = nil
}

// AddEvent adds an event to the event queue.
func (el *EventLoop) AddEvent(event any) {
if event != nil {
el.eventQ.push(event)
el.processEvent(event, true)
}
}

// Context returns the context associated with the event loop.
// Usually, this context will be the one passed to Run.
// However, if Tick is used instead of Run, Context will return
// the last context that was passed to Tick.
// If neither Run nor Tick have been called,
// Context returns context.Background.
func (el *EventLoop) Context() context.Context {
el.mut.Lock()
defer el.mut.Unlock()

return el.ctx
}

func (el *EventLoop) setContext(ctx context.Context) {
el.mut.Lock()
defer el.mut.Unlock()

el.ctx = ctx
}

// Run runs the event loop. A context object can be provided to stop the event loop.
func (el *EventLoop) Run(ctx context.Context) {
loop:
Expand All @@ -138,14 +189,14 @@ loop:
el.startTicker(ctx, e.tickerID)
continue
}
el.processEvent(event)
el.processEvent(event, false)
}

// HACK: when we get cancelled, we will handle the events that were in the queue at that time before quitting.
l := el.eventQ.len()
for i := 0; i < l; i++ {
event, _ := el.eventQ.pop()
el.processEvent(event)
el.processEvent(event, false)
}
}

Expand All @@ -159,7 +210,7 @@ func (el *EventLoop) Tick() bool {
if e, ok := event.(startTickerEvent); ok {
el.startTicker(context.Background(), e.tickerID)
} else {
el.processEvent(event)
el.processEvent(event, false)
}

return true
Expand All @@ -168,9 +219,12 @@ func (el *EventLoop) Tick() bool {
var handlerListPool = gpool.New(func() []EventHandler { return make([]EventHandler, 0, 10) })

// processEvent dispatches the event to the correct handler.
func (el *EventLoop) processEvent(event any) {
func (el *EventLoop) processEvent(event any, async bool) {
t := reflect.TypeOf(event)
defer el.dispatchDelayedEvents(t)

if !async {
defer el.dispatchDelayedEvents(t)
}

if f, ok := event.(func()); ok {
f()
Expand All @@ -179,19 +233,29 @@ func (el *EventLoop) processEvent(event any) {

// Must copy handlers to a list so that they can be executed after unlocking the mutex.
// Use a pool to reduce memory allocations.
priorityList := handlerListPool.Get()
handlerList := handlerListPool.Get()

el.mut.Lock()
for _, observer := range el.observers[t] {
if observer != nil {
handlerList = append(handlerList, observer)
for _, handler := range el.handlers[t] {
if handler.opts.async != async || handler.callback == nil {
continue
}
if handler.opts.priority {
priorityList = append(priorityList, handler.callback)
} else {
handlerList = append(handlerList, handler.callback)
}
}
if handler, ok := el.handlers[t]; ok {
handlerList = append(handlerList, handler)
}
el.mut.Unlock()

for _, handler := range priorityList {
handler(event)
}

priorityList = priorityList[:0]
handlerListPool.Put(priorityList)

for _, handler := range handlerList {
handler(event)
}
Expand All @@ -201,18 +265,21 @@ func (el *EventLoop) processEvent(event any) {
}

func (el *EventLoop) dispatchDelayedEvents(t reflect.Type) {
var (
events []any
ok bool
)

el.mut.Lock()
if delayed, ok := el.waitingEvents[t]; ok {
for _, event := range delayed {
el.AddEvent(event)
}
if events, ok = el.waitingEvents[t]; ok {
delete(el.waitingEvents, t)
// delayListPool.Put(delayed[:0])
}
el.mut.Unlock()
}

// var delayListPool = gpool.New(func() []any { return make([]any, 0, 1) })
for _, event := range events {
el.AddEvent(event)
}
}

// DelayUntil allows us to delay handling of an event until after another event has happened.
// The eventType parameter decides the type of event to wait for, and it should be the zero value
Expand All @@ -224,9 +291,6 @@ func (el *EventLoop) DelayUntil(eventType, event any) {
el.mut.Lock()
t := reflect.TypeOf(eventType)
v := el.waitingEvents[t]
// if v == nil {
// v = delayListPool.Get()
// }
v = append(v, event)
el.waitingEvents[t] = v
el.mut.Unlock()
Expand Down

0 comments on commit 4c52587

Please sign in to comment.