diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 2d8a23d8..7f06bcd6 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -9,6 +9,7 @@ import ( "log" "sort" "sync" + "fmt" ) // EBus holds the EventBus object @@ -89,7 +90,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa subscriber := &Subscriber{ Name: moduleName, - Ch: make(chan interface{}, 1), + Ch: make(chan interface{}), Quit: make(chan bool), Priority: priority, } @@ -128,10 +129,20 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { } // Publish api notifies the subscribers with certain eventType -func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) { +func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { e.publishL.RLock() defer e.publishL.RUnlock() - subscriber.Ch <- objectData + var err error + // We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel + // and the Publish function will stuck. So the default case serves exctly this purpose. + select { + case subscriber.Ch <- objectData: + log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) + default: + log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name) + err = fmt.Errorf("Channel is busy") + } + return err } // Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out) diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index 3f32c633..cb11a9ae 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -31,7 +31,8 @@ type Task struct { objectType string resourceVersion string subIndex int - retryTimer time.Duration + // systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus + systemTimer time.Duration subs []*eventbus.Subscriber } @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib objectType: objectType, resourceVersion: resourceVersion, subIndex: 0, + systemTimer: 1 * time.Second, subs: subs, } } @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs // StatusUpdated creates a task status and sends it for handling func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) { taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component) - - // Do we need to make this call happen in a goroutine in order to not make the - // StatusUpdated function stuck in case that nobody reads what is written on the channel ? - // Is there any case where this can happen - // (nobody reads what is written on the channel and the StatusUpdated gets stuck) ? - t.taskStatusChan <- taskStatus - log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus) + log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus) + + // We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel. + // e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile + // the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet + // as the timer has not expired and the queue is empty (We assume that there is only one task in the queue). + select { + case t.taskStatusChan <- taskStatus: + log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus) + default: + log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus) + } } // processTasks processes the task @@ -123,7 +130,18 @@ func (t *TaskManager) processTasks() { // (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed) NotificationID: uuid.NewString(), } - eventbus.EBus.Publish(objectData, sub) + if err := eventbus.EBus.Publish(objectData, sub); err != nil { + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. + task.subIndex += i + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) + break loopTwo + } log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData) loopThree: @@ -143,11 +161,17 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID) // We need a timeout in case that the subscriber doesn't update the status at all for whatever reason. - // If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer + // If that occurs then we just requeue the task with a timer case <-time.After(30 * time.Second): - log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus) + log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - go t.taskQueue.Enqueue(task) + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) break loopThree } } @@ -159,19 +183,27 @@ func (t *TaskManager) processTasks() { break loopTwo } + // We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded + task.systemTimer = 1 * time.Second + switch taskStatus.component.CompStatus { case common.ComponentStatusSuccess: log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task) continue loopTwo - default: + case common.ComponentStatusError: log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task) + log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - task.retryTimer = taskStatus.component.Timer - log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer) - time.AfterFunc(task.retryTimer, func() { + time.AfterFunc(taskStatus.component.Timer, func() { t.taskQueue.Enqueue(task) }) break loopTwo + default: + log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task) + log.Printf("processTasks(): The task %+v will be dropped\n", task) + break loopTwo } } }