diff --git a/app/core/event/queue.go b/app/core/event/queue.go index a9d838c..4bf9da7 100644 --- a/app/core/event/queue.go +++ b/app/core/event/queue.go @@ -1,25 +1,31 @@ package event import ( - "github.com/aimerny/kook-go/app/core/model" "math" + "sync" + + "github.com/aimerny/kook-go/app/core/model" ) type EventQueue struct { - events map[int]*model.Signal + events *sync.Map MinSn int + mu *sync.RWMutex } func NewEventQueue() *EventQueue { return &EventQueue{ - events: make(map[int]*model.Signal), + events: &sync.Map{}, MinSn: math.MaxInt, + mu: &sync.RWMutex{}, } } // Push returns min sn func (eq *EventQueue) Push(signal *model.Signal) int { - eq.events[signal.SerialNumber] = signal + eq.mu.Lock() + defer eq.mu.Unlock() + eq.events.Store(signal.SerialNumber, signal) if signal.SerialNumber < eq.MinSn { eq.MinSn = signal.SerialNumber } @@ -27,29 +33,47 @@ func (eq *EventQueue) Push(signal *model.Signal) int { } func (eq *EventQueue) Pop() *model.Signal { - if signal, ok := eq.events[eq.MinSn]; ok { - eq.events[eq.MinSn] = nil - delete(eq.events, eq.MinSn) - eq.resetMinKey() - return signal + eq.mu.Lock() + defer eq.mu.Unlock() + if signal, ok := eq.events.Load(eq.MinSn); ok { + if signal, ok := signal.(*model.Signal); ok { + eq.events.Delete(eq.MinSn) + eq.resetMinKey() + return signal + } } return nil } func (eq *EventQueue) IsEmpty() bool { - return len(eq.events) == 0 + empty := true + eq.events.Range(func(key, value interface{}) bool { + empty = false + return false + }) + return empty } func (eq *EventQueue) Clear() { - eq.events = make(map[int]*model.Signal) + eq.mu.Lock() + defer eq.mu.Unlock() + eq.events.Range(func(k, v interface{}) bool { + eq.events.Delete(k) + return true + }) eq.MinSn = math.MaxInt } func (eq *EventQueue) resetMinKey() { + eq.MinSn = math.MaxInt - for k := range eq.events { - if k < eq.MinSn { - eq.MinSn = k + eq.events.Range(func(k, v interface{}) bool { + if k, ok := k.(int); ok { + if k < eq.MinSn { + eq.MinSn = k + } } - } + return true + }) + }