Skip to content

Commit

Permalink
fix(evpn-bridge): second review
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitrios Markou <[email protected]>
  • Loading branch information
mardim91 committed Aug 22, 2024
1 parent 1a5cb19 commit 78e794e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
3 changes: 0 additions & 3 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type EventBus struct {
subscribers map[string][]*Subscriber
eventHandlers map[string]EventHandler
subscriberL sync.RWMutex
publishL sync.RWMutex
mutex sync.RWMutex
}

Expand Down Expand Up @@ -130,8 +129,6 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {
e.publishL.Lock()
defer e.publishL.Unlock()
var err error

select {
Expand Down
11 changes: 9 additions & 2 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (t *TaskManager) processTasks() {
task := t.taskQueue.Dequeue()
log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task)

// A new sub-list of the initial subscribers list will be generated based on the value of the subIndex.
// This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial
// list (subIndex greater than zero) in case a requeue event occurred.
subsToIterate := task.subs[task.subIndex:]
loopTwo:
for i, sub := range subsToIterate {
Expand All @@ -134,7 +137,9 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): Failed to sent notification: %+v\n", err)
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
Expand Down Expand Up @@ -195,7 +200,9 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
time.AfterFunc(taskStatus.component.Timer, func() {
t.taskQueue.Enqueue(task)
Expand Down

0 comments on commit 78e794e

Please sign in to comment.