Skip to content

Commit

Permalink
fix(evpn-bridge): fix system behaviour for pending objects
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitrios Markou <[email protected]>
  • Loading branch information
mardim91 committed Jul 14, 2024
1 parent 708f948 commit ae178ed
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
17 changes: 14 additions & 3 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"sort"
"sync"
"fmt"
)

// EBus holds the EventBus object
Expand Down Expand Up @@ -89,7 +90,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa

subscriber := &Subscriber{
Name: moduleName,
Ch: make(chan interface{}, 1),
Ch: make(chan interface{}),
Quit: make(chan bool),
Priority: priority,
}
Expand Down Expand Up @@ -128,10 +129,20 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {
e.publishL.RLock()
defer e.publishL.RUnlock()
subscriber.Ch <- objectData
var err error
// We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel
// and the Publish function will stuck. So the default case serves exctly this purpose.
select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name)
err = fmt.Errorf("Channel is busy")

Check failure on line 143 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View workflow job for this annotation

GitHub Actions / call / golangci

ST1005: error strings should not be capitalized (stylecheck)
}
return err
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
Expand Down
64 changes: 48 additions & 16 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type Task struct {
objectType string
resourceVersion string
subIndex int
retryTimer time.Duration
// systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus
systemTimer time.Duration

Check failure on line 35 in pkg/infradb/taskmanager/taskmanager.go

View workflow job for this annotation

GitHub Actions / call / golangci

File is not `gofmt`-ed with `-s` (gofmt)
subs []*eventbus.Subscriber
}

Expand Down Expand Up @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib
objectType: objectType,
resourceVersion: resourceVersion,
subIndex: 0,
systemTimer: 1 * time.Second,
subs: subs,
}
}
Expand Down Expand Up @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs
// StatusUpdated creates a task status and sends it for handling
func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) {
taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component)

// Do we need to make this call happen in a goroutine in order to not make the
// StatusUpdated function stuck in case that nobody reads what is written on the channel ?
// Is there any case where this can happen
// (nobody reads what is written on the channel and the StatusUpdated gets stuck) ?
t.taskStatusChan <- taskStatus
log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus)
log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus)

// We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel.
// e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile
// the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet
// as the timer has not expired and the queue is empty (We assume that there is only one task in the queue).
select {
case t.taskStatusChan <- taskStatus:
log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus)
default:
log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus)
}
}

// processTasks processes the task
Expand All @@ -123,7 +130,18 @@ func (t *TaskManager) processTasks() {
// (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed)
NotificationID: uuid.NewString(),
}
eventbus.EBus.Publish(objectData, sub)
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
}
log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData)

loopThree:
Expand All @@ -143,11 +161,17 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID)

// We need a timeout in case that the subscriber doesn't update the status at all for whatever reason.
// If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer
// If that occurs then we just requeue the task with a timer
case <-time.After(30 * time.Second):
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus)
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
go t.taskQueue.Enqueue(task)
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopThree
}
}
Expand All @@ -159,19 +183,27 @@ func (t *TaskManager) processTasks() {
break loopTwo
}

// We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded
task.systemTimer = 1 * time.Second

switch taskStatus.component.CompStatus {
case common.ComponentStatusSuccess:
log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task)
continue loopTwo
default:
case common.ComponentStatusError:
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
task.retryTimer = taskStatus.component.Timer
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer)
time.AfterFunc(task.retryTimer, func() {
time.AfterFunc(taskStatus.component.Timer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
default:
log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task)
log.Printf("processTasks(): The task %+v will be dropped\n", task)
break loopTwo
}
}
}
Expand Down

0 comments on commit ae178ed

Please sign in to comment.