diff --git a/pkg/infradb/subscriberframework/eventbus/event_bus_test.go b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go new file mode 100644 index 00000000..d50418fc --- /dev/null +++ b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go @@ -0,0 +1,184 @@ +package eventbus + +import ( + "log" + "sync" + "testing" + "time" +) + +type moduleCiHandler struct { + receivedEvents []*ObjectData + sync.Mutex +} + +// Constants +const ( + TestEvent = "testEvent" + TestEventpriority = "testEventpriority" + TestEventChBusy = "testEventChBusy" + TestEventUnsub = "testEventUnsub" +) + +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *ObjectData) { + h.Lock() + defer h.Unlock() + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case TestEvent: + case TestEventpriority: + case TestEventChBusy: + case TestEventUnsub: + log.Printf("received event type %s", eventType) + default: + log.Panicf("error: Unknown event type %s", eventType) + } +} + +func TestSubscribeAndPublish(t *testing.T) { + handler := &moduleCiHandler{} + + EventBus := NewEventBus() + EventBus.StartSubscriber("testModule", TestEvent, 1, handler) + time.Sleep(10 * time.Millisecond) + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + + subscribers := EventBus.GetSubscribers(TestEvent) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEvent'") + } + subscriber := subscribers[0] + + err := EventBus.Publish(objectData, subscriber) + if err != nil { + t.Errorf("Publish() failed with error: %v", err) + } + + time.Sleep(10 * time.Millisecond) + handler.Lock() + if len(handler.receivedEvents) != 1 { + t.Errorf("Event was not received by the handler as expected") + } + if handler.receivedEvents[0] != objectData { + t.Errorf("Event data was not received by the handler as expected") + } + handler.Unlock() + + EventBus.Unsubscribe(subscriber) +} + +func TestPriorityOrderWithStartSubscriber(t *testing.T) { + handler1 := &moduleCiHandler{} + handler2 := &moduleCiHandler{} + + EventBus := NewEventBus() + + EventBus.StartSubscriber("testModule1", TestEventpriority, 2, handler1) + EventBus.StartSubscriber("testModule2", TestEventpriority, 1, handler2) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventpriority) + if len(subscribers) != 2 { + t.Errorf("Expected 2 subscribers, got %d", len(subscribers)) + } + if subscribers[0].Priority > subscribers[1].Priority { + t.Errorf("Subscribers are not sorted by priority") + } + + for _, sub := range subscribers { + EventBus.Unsubscribe(sub) + } +} +func TestPublishChannelBusyWithStartSubscriber(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleChBusy", TestEventChBusy, 1, handler) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventChBusy) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventChBusy'") + } + subscriber := subscribers[0] + + subscriber.Ch <- &ObjectData{} + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + err := EventBus.Publish(objectData, subscriber) + if err == nil { + t.Errorf("Expected an error when publishing to a busy channel, but got nil") + } + + EventBus.Unsubscribe(subscriber) +} +func TestUnsubscribeEvent(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.UnsubscribeEvent(subscriber, TestEventUnsub) + + subscribers = EventBus.GetSubscribers(TestEventUnsub) + for _, sub := range subscribers { + if sub == subscriber { + t.Errorf("Subscriber was not successfully unsubscribed") + } + } +} + +func TestUnsubscribe(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.Unsubscribe(subscriber) + + select { + case _, ok := <-subscriber.Ch: + if ok { + t.Errorf("Subscriber's channel should be closed, but it's not") + } + default: + } +} + +func TestSubscriberAlreadyExist(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleSubExist", "testEventSubExist", 3, handler) + + exists := EventBus.subscriberExist("testEventSubExist", "testModuleSubExist") + if !exists { + t.Errorf("subscriberExist should return true for existing subscriber") + } + + subscribers := EventBus.GetSubscribers("testEventSubExist") + for _, sub := range subscribers { + if sub.Name == "testModuleSubExist" { + EventBus.Unsubscribe(sub) + } + } +} diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 2d8a23d8..21e9cd1b 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -6,6 +6,7 @@ package eventbus import ( + "fmt" "log" "sort" "sync" @@ -19,7 +20,6 @@ type EventBus struct { subscribers map[string][]*Subscriber eventHandlers map[string]EventHandler subscriberL sync.RWMutex - publishL sync.RWMutex mutex sync.RWMutex } @@ -52,10 +52,13 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e for { select { case event := <-subscriber.Ch: - log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) + log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) handlerKey := moduleName + "." + eventType - if handler, ok := e.eventHandlers[handlerKey]; ok { + e.subscriberL.Lock() + handler, ok := e.eventHandlers[handlerKey] + e.subscriberL.Unlock() + if ok { if objectData, ok := event.(*ObjectData); ok { handler.HandleEvent(eventType, objectData) } else { @@ -65,6 +68,7 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e } else { subscriber.Ch <- "error: no event handler found" } + case <-subscriber.Quit: close(subscriber.Ch) return @@ -89,7 +93,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa subscriber := &Subscriber{ Name: moduleName, - Ch: make(chan interface{}, 1), + Ch: make(chan interface{}), Quit: make(chan bool), Priority: priority, } @@ -128,10 +132,16 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { } // Publish api notifies the subscribers with certain eventType -func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) { - e.publishL.RLock() - defer e.publishL.RUnlock() - subscriber.Ch <- objectData +func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { + var err error + + select { + case subscriber.Ch <- objectData: + log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) + default: + err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name) + } + return err } // Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out) diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index 3f32c633..a831f98a 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -31,8 +31,9 @@ type Task struct { objectType string resourceVersion string subIndex int - retryTimer time.Duration - subs []*eventbus.Subscriber + // systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus + systemTimer time.Duration + subs []*eventbus.Subscriber } // TaskStatus holds info related to the status that has been received @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib objectType: objectType, resourceVersion: resourceVersion, subIndex: 0, + systemTimer: 1 * time.Second, subs: subs, } } @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs // StatusUpdated creates a task status and sends it for handling func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) { taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component) - - // Do we need to make this call happen in a goroutine in order to not make the - // StatusUpdated function stuck in case that nobody reads what is written on the channel ? - // Is there any case where this can happen - // (nobody reads what is written on the channel and the StatusUpdated gets stuck) ? - t.taskStatusChan <- taskStatus - log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus) + log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus) + + // We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel. + // e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile + // the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet + // as the timer has not expired and the queue is empty (We assume that there is only one task in the queue). + select { + case t.taskStatusChan <- taskStatus: + log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus) + default: + log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus) + } } // processTasks processes the task @@ -111,6 +118,9 @@ func (t *TaskManager) processTasks() { task := t.taskQueue.Dequeue() log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task) + // A new sub-list of the initial subscribers list will be generated based on the value of the subIndex. + // This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial + // list (subIndex greater than zero) in case a requeue event occurred. subsToIterate := task.subs[task.subIndex:] loopTwo: for i, sub := range subsToIterate { @@ -123,7 +133,21 @@ func (t *TaskManager) processTasks() { // (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed) NotificationID: uuid.NewString(), } - eventbus.EBus.Publish(objectData, sub) + if err := eventbus.EBus.Publish(objectData, sub); err != nil { + log.Printf("processTasks(): Failed to sent notification: %+v\n", err) + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. + task.subIndex += i + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) + break loopTwo + } log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData) loopThree: @@ -143,11 +167,17 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID) // We need a timeout in case that the subscriber doesn't update the status at all for whatever reason. - // If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer + // If that occurs then we just requeue the task with a timer case <-time.After(30 * time.Second): - log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus) + log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - go t.taskQueue.Enqueue(task) + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) break loopThree } } @@ -159,19 +189,29 @@ func (t *TaskManager) processTasks() { break loopTwo } + // We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded + task.systemTimer = 1 * time.Second + switch taskStatus.component.CompStatus { case common.ComponentStatusSuccess: log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task) continue loopTwo - default: + case common.ComponentStatusError: log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task) + log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. task.subIndex += i - task.retryTimer = taskStatus.component.Timer - log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer) - time.AfterFunc(task.retryTimer, func() { + time.AfterFunc(taskStatus.component.Timer, func() { t.taskQueue.Enqueue(task) }) break loopTwo + default: + log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task) + log.Printf("processTasks(): The task %+v will be dropped\n", task) + break loopTwo } } } diff --git a/pkg/infradb/taskmanager/taskmanager_test.go b/pkg/infradb/taskmanager/taskmanager_test.go new file mode 100644 index 00000000..8a2830b8 --- /dev/null +++ b/pkg/infradb/taskmanager/taskmanager_test.go @@ -0,0 +1,288 @@ +package taskmanager + +import ( + "log" + "sync" + "testing" + "time" + + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/common" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" + "github.com/stretchr/testify/assert" +) + +var ( + retValMu sync.Mutex + retVal bool +) + +type moduleCiHandler struct { + receivedEvents []*eventbus.ObjectData +} + +func handleTestEvent(objectData *eventbus.ObjectData) { + name := "testTask" + objectType := "testType" + resourceVersion := "testVersion" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventCompSuccess(objectData *eventbus.ObjectData) { + name := "testTaskCompSuccess" + objectType := "testTypeCompSuccess" + resourceVersion := "testVersionCompSuccess" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventbusy(wg *sync.WaitGroup) { + name := "testTaskbusy" + objectType := "testTypebusy" + resourceVersion := "testVersionbusy" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, "notificationID", dropTask, component) + + retValMu.Lock() + retVal = true + retValMu.Unlock() + + wg.Done() +} +func handletestEventTimeout() { + +} +func handletestNotificationIDNotMatching(objectData *eventbus.ObjectData) { + name := "testTaskNotificationIdNotMatching" + objectType := "testTypeNotificationIdNotMatching" + resourceVersion := "testVersionNotificationIdNotMatching" + dropTask := false + + component := &common.Component{ + Name: "testModuleNotificationIdNotMatching", + CompStatus: common.ComponentStatusSuccess, + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, "NotificationIdNotMatching", dropTask, component) + + time.Sleep(100 * time.Millisecond) + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventError(objectData *eventbus.ObjectData) { + name := "testTaskEventError" + objectType := "testTypeEventError" + resourceVersion := "testVersionEventError" + + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusError, + } + if component.Timer == 0 { + component.Timer = 2 * time.Second + } else { + component.Timer *= 2 + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *eventbus.ObjectData) { + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case "testEvent": + handleTestEvent(objectData) + case "testEventCompSuccess": + handleTestEventCompSuccess(objectData) + case "testEventError": + handleTestEventError(objectData) + case "testEventTimeout": + handletestEventTimeout() + case "testEventNotificationIdNotMatching": + handletestNotificationIDNotMatching(objectData) + default: + log.Printf("LCI: error: Unknown event type %s", eventType) + } +} + +func TestCreateTask(t *testing.T) { + subscriber := &eventbus.Subscriber{ + Name: "testSubscriber", + Ch: make(chan interface{}), + Quit: make(chan bool), + Priority: 1, + } + subs := []*eventbus.Subscriber{subscriber} + tm := newTaskManager() + tm.StartTaskManager() + tm.CreateTask("testTask", "testType", "testVersion", subs) + + time.Sleep(100 * time.Millisecond) + + task := tm.taskQueue.Dequeue() + assert.NotNil(t, task) + assert.Equal(t, "testTask", task.name) + assert.Equal(t, "testType", task.objectType) + assert.Equal(t, "testVersion", task.resourceVersion) + assert.Equal(t, subs, task.subs) +} + +func TestCompSuccess(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + handler := &moduleCiHandler{} + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleCompSuccess", "testEventCompSuccess", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventCompSuccess") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskCompSuccess", "testTypeCompSuccess", "testVersionCompSuccess", subscribers) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTaskCompSuccess" { + t.Errorf("assert failed:") + } + default: + } +} + +func TestCompError(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + + handler := &moduleCiHandler{} + + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleEventError", "testEventError", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventError") + + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventError'") + } + + TaskMan.CreateTask("testTaskEventError", "testTypeEventError", "testVersionEventError", subscribers) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task, "Task should not be nil") + assert.Equal(t, "testTaskEventError", task.name, "Task name should match") + assert.Equal(t, "testTypeEventError", task.objectType, "Task object type should match") + assert.Equal(t, "testVersionEventError", task.resourceVersion, "Task resource version should match") + assert.Equal(t, subscribers, task.subs, "Task subscribers should match") +} + +func TestTimeout(t *testing.T) { + var wg sync.WaitGroup + + handler := &moduleCiHandler{} + TaskMan.StartTaskManager() + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleTimeout", "testEventTimeout", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + subscribers := eventbus.EBus.GetSubscribers("testEventTimeout") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventTimeout'") + } + TaskMan.CreateTask("testTaskTimeout", "testTypeTimeout", "testVersionTimeout", subscribers) + + time.Sleep(35 * time.Second) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task) + assert.Equal(t, "testTaskTimeout", task.name) + assert.Equal(t, "testTypeTimeout", task.objectType) + assert.Equal(t, "testVersionTimeout", task.resourceVersion) + assert.Equal(t, subscribers, task.subs) +} + +func TestNotificationIdNotMatching(t *testing.T) { + var wg sync.WaitGroup + + // Start the TaskManager in a separate goroutine + wg.Add(1) + go func() { + defer wg.Done() + TaskMan.StartTaskManager() + }() + + handler := &moduleCiHandler{} + // Start the subscriber and wait for it to be ready + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleNotificationIdNotMatching", "testEventNotificationIdNotMatching", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + + for i := 0; i < cap(TaskMan.taskStatusChan); i++ { + TaskMan.taskStatusChan <- &TaskStatus{} + } + + subscribers := eventbus.EBus.GetSubscribers("testEventNotificationIdNotMatching") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskNotificationIdNotMatching", "testTypeNotificationIdNotMatching", "testVersionNotificationIdNotMatching", subscribers) + + time.Sleep(500 * time.Millisecond) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTask" { + t.Errorf("assert failed:") + } + default: + } + + for len(TaskMan.taskStatusChan) > 0 { + <-TaskMan.taskStatusChan + } +} + +func TestStatusUpdatedChannelNotAvailable(t *testing.T) { + var wg sync.WaitGroup + + wg.Add(1) + + go handleTestEventbusy(&wg) + + wg.Wait() + actualRetVal := retVal + + assert.Equal(t, true, actualRetVal) +}