diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go index e9e896f0..278bda51 100644 --- a/eventloop/eventloop.go +++ b/eventloop/eventloop.go @@ -16,9 +16,37 @@ import ( "github.com/relab/hotstuff/util/gpool" ) +type handlerOpts struct { + async bool + priority bool +} + +// HandlerOption sets configuration options for event handlers. +type HandlerOption func(*handlerOpts) + +// RunAsync instructs the eventloop to run the handler asynchronously. +func RunAsync() HandlerOption { + return func(ho *handlerOpts) { + ho.async = true + } +} + +// WithPriority instructs the eventloop to prioritize running the handler before others. +// This guarantees that the handler runs before handlers that have not requested priority. +func WithPriority() HandlerOption { + return func(ho *handlerOpts) { + ho.priority = true + } +} + // EventHandler processes an event. type EventHandler func(event any) +type handler struct { + callback EventHandler + opts handlerOpts +} + // EventLoop accepts events of any type and executes relevant event handlers. // It supports registering both observers and handlers based on the type of event that they accept. // The difference between them is that there can be many observers per event type, but only one handler, @@ -28,11 +56,12 @@ type EventLoop struct { mut sync.Mutex + ctx context.Context + eventQ queue waitingEvents map[reflect.Type][]any - handlers map[reflect.Type]EventHandler - observers map[reflect.Type][]EventHandler + handlers map[reflect.Type][]handler tickers map[int]*ticker tickerID int @@ -43,8 +72,7 @@ func New(bufferSize uint) *EventLoop { el := &EventLoop{ eventQ: newQueue(bufferSize), waitingEvents: make(map[reflect.Type][]any), - handlers: make(map[reflect.Type]EventHandler), - observers: make(map[reflect.Type][]EventHandler), + handlers: make(map[reflect.Type][]handler), tickers: make(map[int]*ticker), } return el @@ -64,63 +92,86 @@ func RegisterObserver[E any](eventloop *EventLoop, handler func(event E)) { eventloop.RegisterObserver(zero, func(event any) { handler(event.(E)) }) } -// RegisterHandler registers a handler for events with the same type as the 'eventType' argument. -// There can be only one handler per event type, and the handler is executed after any observers. -func (el *EventLoop) RegisterHandler(eventType any, handler EventHandler) { - el.mut.Lock() - defer el.mut.Unlock() - el.handlers[reflect.TypeOf(eventType)] = handler +// RegisterObserver registers a handler with priority. +// Deprecated: use RegisterHandler and the WithPriority option instead. +func (el *EventLoop) RegisterObserver(eventType any, handler EventHandler) int { + return el.RegisterHandler(eventType, handler, WithPriority()) } -func (el *EventLoop) UnregisterHandler(eventType any) { - el.mut.Lock() - defer el.mut.Unlock() - delete(el.handlers, reflect.TypeOf(eventType)) +// UnregisterObserver unregister a handler. +// Deprecated: use UnregisterHandler instead. +func (el *EventLoop) UnregisterObserver(eventType any, id int) { + el.UnregisterHandler(eventType, id) } -// RegisterObserver registers an observer for events with the same type as the 'eventType' argument. -// The observers are executed before the handler. -// -// Returns an observer ID that can be used to remove the observer later. -func (el *EventLoop) RegisterObserver(eventType any, observer EventHandler) int { +// RegisterHandler registers an event handler. The handler will +func (el *EventLoop) RegisterHandler(eventType any, callback EventHandler, opts ...HandlerOption) int { + h := handler{callback: callback} + + for _, opt := range opts { + opt(&h.opts) + } + el.mut.Lock() defer el.mut.Unlock() t := reflect.TypeOf(eventType) - observers := el.observers[t] + handlers := el.handlers[t] i := 0 - for ; i < len(observers); i++ { - if observers[i] == nil { + for ; i < len(handlers); i++ { + if handlers[i].callback == nil { break } } - if i == len(observers) { - observers = append(observers, observer) + if i == len(handlers) { + handlers = append(handlers, h) } else { - observers[i] = observer + handlers[i] = h } - el.observers[t] = observers + el.handlers[t] = handlers return i } -func (el *EventLoop) UnregisterObserver(eventType any, id int) { +// UnregisterHandler unregisters the handler for the given event type with the given id. +func (el *EventLoop) UnregisterHandler(eventType any, id int) { el.mut.Lock() defer el.mut.Unlock() t := reflect.TypeOf(eventType) - el.observers[t][id] = nil + el.handlers[t][id].callback = nil } // AddEvent adds an event to the event queue. func (el *EventLoop) AddEvent(event any) { if event != nil { el.eventQ.push(event) + el.processEvent(event, true) } } +// Context returns the context associated with the event loop. +// Usually, this context will be the one passed to Run. +// However, if Tick is used instead of Run, Context will return +// the last context that was passed to Tick. +// If neither Run nor Tick have been called, +// Context returns context.Background. +func (el *EventLoop) Context() context.Context { + el.mut.Lock() + defer el.mut.Unlock() + + return el.ctx +} + +func (el *EventLoop) setContext(ctx context.Context) { + el.mut.Lock() + defer el.mut.Unlock() + + el.ctx = ctx +} + // Run runs the event loop. A context object can be provided to stop the event loop. func (el *EventLoop) Run(ctx context.Context) { loop: @@ -138,14 +189,14 @@ loop: el.startTicker(ctx, e.tickerID) continue } - el.processEvent(event) + el.processEvent(event, false) } // HACK: when we get cancelled, we will handle the events that were in the queue at that time before quitting. l := el.eventQ.len() for i := 0; i < l; i++ { event, _ := el.eventQ.pop() - el.processEvent(event) + el.processEvent(event, false) } } @@ -159,7 +210,7 @@ func (el *EventLoop) Tick() bool { if e, ok := event.(startTickerEvent); ok { el.startTicker(context.Background(), e.tickerID) } else { - el.processEvent(event) + el.processEvent(event, false) } return true @@ -168,9 +219,12 @@ func (el *EventLoop) Tick() bool { var handlerListPool = gpool.New(func() []EventHandler { return make([]EventHandler, 0, 10) }) // processEvent dispatches the event to the correct handler. -func (el *EventLoop) processEvent(event any) { +func (el *EventLoop) processEvent(event any, async bool) { t := reflect.TypeOf(event) - defer el.dispatchDelayedEvents(t) + + if !async { + defer el.dispatchDelayedEvents(t) + } if f, ok := event.(func()); ok { f() @@ -179,19 +233,29 @@ func (el *EventLoop) processEvent(event any) { // Must copy handlers to a list so that they can be executed after unlocking the mutex. // Use a pool to reduce memory allocations. + priorityList := handlerListPool.Get() handlerList := handlerListPool.Get() el.mut.Lock() - for _, observer := range el.observers[t] { - if observer != nil { - handlerList = append(handlerList, observer) + for _, handler := range el.handlers[t] { + if handler.opts.async != async || handler.callback == nil { + continue + } + if handler.opts.priority { + priorityList = append(priorityList, handler.callback) + } else { + handlerList = append(handlerList, handler.callback) } - } - if handler, ok := el.handlers[t]; ok { - handlerList = append(handlerList, handler) } el.mut.Unlock() + for _, handler := range priorityList { + handler(event) + } + + priorityList = priorityList[:0] + handlerListPool.Put(priorityList) + for _, handler := range handlerList { handler(event) } @@ -201,18 +265,21 @@ func (el *EventLoop) processEvent(event any) { } func (el *EventLoop) dispatchDelayedEvents(t reflect.Type) { + var ( + events []any + ok bool + ) + el.mut.Lock() - if delayed, ok := el.waitingEvents[t]; ok { - for _, event := range delayed { - el.AddEvent(event) - } + if events, ok = el.waitingEvents[t]; ok { delete(el.waitingEvents, t) - // delayListPool.Put(delayed[:0]) } el.mut.Unlock() -} -// var delayListPool = gpool.New(func() []any { return make([]any, 0, 1) }) + for _, event := range events { + el.AddEvent(event) + } +} // DelayUntil allows us to delay handling of an event until after another event has happened. // The eventType parameter decides the type of event to wait for, and it should be the zero value @@ -224,9 +291,6 @@ func (el *EventLoop) DelayUntil(eventType, event any) { el.mut.Lock() t := reflect.TypeOf(eventType) v := el.waitingEvents[t] - // if v == nil { - // v = delayListPool.Get() - // } v = append(v, event) el.waitingEvents[t] = v el.mut.Unlock()