From 85bde5991b96c1d84f885c92c2591ea4a0142174 Mon Sep 17 00:00:00 2001 From: Dimitrios Markou Date: Fri, 12 Jul 2024 15:42:59 +0100 Subject: [PATCH 1/4] fix(evpn-bridge): fix system behaviour for pending objects Signed-off-by: Dimitrios Markou --- .../subscriberframework/eventbus/eventbus.go | 17 ++++- pkg/infradb/taskmanager/taskmanager.go | 66 ++++++++++++++----- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 2d8a23d8..ea43e320 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -6,6 +6,7 @@ package eventbus import ( + "fmt" "log" "sort" "sync" @@ -89,7 +90,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa subscriber := &Subscriber{ Name: moduleName, - Ch: make(chan interface{}, 1), + Ch: make(chan interface{}), Quit: make(chan bool), Priority: priority, } @@ -128,10 +129,20 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { } // Publish api notifies the subscribers with certain eventType -func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) { +func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { e.publishL.RLock() defer e.publishL.RUnlock() - subscriber.Ch <- objectData + var err error + // We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel + // and the Publish function will stuck. So the default case serves exctly this purpose. + select { + case subscriber.Ch <- objectData: + log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) + default: + log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name) + err = fmt.Errorf("channel is busy") + } + return err } // Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out) diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index 3f32c633..ce9446d1 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -31,8 +31,9 @@ type Task struct { objectType string resourceVersion string subIndex int - retryTimer time.Duration - subs []*eventbus.Subscriber + // systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus + systemTimer time.Duration + subs []*eventbus.Subscriber } // TaskStatus holds info related to the status that has been received @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib objectType: objectType, resourceVersion: resourceVersion, subIndex: 0, + systemTimer: 1 * time.Second, subs: subs, } } @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs // StatusUpdated creates a task status and sends it for handling func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) { taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component) - - // Do we need to make this call happen in a goroutine in order to not make the - // StatusUpdated function stuck in case that nobody reads what is written on the channel ? - // Is there any case where this can happen - // (nobody reads what is written on the channel and the StatusUpdated gets stuck) ? - t.taskStatusChan <- taskStatus - log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus) + log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus) + + // We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel. + // e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile + // the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet + // as the timer has not expired and the queue is empty (We assume that there is only one task in the queue). + select { + case t.taskStatusChan <- taskStatus: + log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus) + default: + log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus) + } } // processTasks processes the task @@ -123,7 +130,18 @@ func (t *TaskManager) processTasks() { // (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed) NotificationID: uuid.NewString(), } - eventbus.EBus.Publish(objectData, sub) + if err := eventbus.EBus.Publish(objectData, sub); err != nil { + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. + task.subIndex += i + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) + break loopTwo + } log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData) loopThree: @@ -143,11 +161,17 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID) // We need a timeout in case that the subscriber doesn't update the status at all for whatever reason. - // If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer + // If that occurs then we just requeue the task with a timer case <-time.After(30 * time.Second): - log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus) + log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - go t.taskQueue.Enqueue(task) + task.systemTimer *= 2 + log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) + time.AfterFunc(task.systemTimer, func() { + t.taskQueue.Enqueue(task) + }) break loopThree } } @@ -159,19 +183,27 @@ func (t *TaskManager) processTasks() { break loopTwo } + // We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded + task.systemTimer = 1 * time.Second + switch taskStatus.component.CompStatus { case common.ComponentStatusSuccess: log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task) continue loopTwo - default: + case common.ComponentStatusError: log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task) + log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer) + // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task + // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i - task.retryTimer = taskStatus.component.Timer - log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer) - time.AfterFunc(task.retryTimer, func() { + time.AfterFunc(taskStatus.component.Timer, func() { t.taskQueue.Enqueue(task) }) break loopTwo + default: + log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task) + log.Printf("processTasks(): The task %+v will be dropped\n", task) + break loopTwo } } } From 1a5cb19fa7e1cff9fa47dd7c686b1120d46af4a1 Mon Sep 17 00:00:00 2001 From: Dimitrios Markou Date: Wed, 17 Jul 2024 14:43:34 +0000 Subject: [PATCH 2/4] fix(evpn-bridge): first review Signed-off-by: Dimitrios Markou --- pkg/infradb/subscriberframework/eventbus/eventbus.go | 10 ++++------ pkg/infradb/taskmanager/taskmanager.go | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index ea43e320..e0a4c991 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -130,17 +130,15 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { // Publish api notifies the subscribers with certain eventType func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { - e.publishL.RLock() - defer e.publishL.RUnlock() + e.publishL.Lock() + defer e.publishL.Unlock() var err error - // We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel - // and the Publish function will stuck. So the default case serves exctly this purpose. + select { case subscriber.Ch <- objectData: log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) default: - log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name) - err = fmt.Errorf("channel is busy") + err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name) } return err } diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index ce9446d1..b290bb09 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -131,7 +131,8 @@ func (t *TaskManager) processTasks() { NotificationID: uuid.NewString(), } if err := eventbus.EBus.Publish(objectData, sub); err != nil { - log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task) + log.Printf("processTasks(): Failed to sent notification: %+v\n", err) + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task) // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i From 78e794ea15d9d996b2911ddfe52c7f5efe10b1c1 Mon Sep 17 00:00:00 2001 From: Dimitrios Markou Date: Thu, 22 Aug 2024 15:24:45 +0000 Subject: [PATCH 3/4] fix(evpn-bridge): second review Signed-off-by: Dimitrios Markou --- pkg/infradb/subscriberframework/eventbus/eventbus.go | 3 --- pkg/infradb/taskmanager/taskmanager.go | 11 +++++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index e0a4c991..72922020 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -20,7 +20,6 @@ type EventBus struct { subscribers map[string][]*Subscriber eventHandlers map[string]EventHandler subscriberL sync.RWMutex - publishL sync.RWMutex mutex sync.RWMutex } @@ -130,8 +129,6 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { // Publish api notifies the subscribers with certain eventType func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { - e.publishL.Lock() - defer e.publishL.Unlock() var err error select { diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index b290bb09..a831f98a 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -118,6 +118,9 @@ func (t *TaskManager) processTasks() { task := t.taskQueue.Dequeue() log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task) + // A new sub-list of the initial subscribers list will be generated based on the value of the subIndex. + // This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial + // list (subIndex greater than zero) in case a requeue event occurred. subsToIterate := task.subs[task.subIndex:] loopTwo: for i, sub := range subsToIterate { @@ -134,7 +137,9 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): Failed to sent notification: %+v\n", err) log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task) // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task - // so we do start again from the subscriber that returned an error or was unavailable for any reason. + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. task.subIndex += i task.systemTimer *= 2 log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) @@ -195,7 +200,9 @@ func (t *TaskManager) processTasks() { log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task) log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer) // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task - // so we do start again from the subscriber that returned an error or was unavailable for any reason. + // so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing + // of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list + // of the remaining subscribers which is equal or smaller than the initial subscribers list. task.subIndex += i time.AfterFunc(taskStatus.component.Timer, func() { t.taskQueue.Enqueue(task) From f3e299f79bfb3dd182ecfabe59ea1d9c09929f2f Mon Sep 17 00:00:00 2001 From: atulpatel261194 Date: Thu, 5 Sep 2024 03:01:51 -0700 Subject: [PATCH 4/4] fix(evpn): added unit test for taskMgr and eventbus Signed-off-by: atulpatel261194 --- .../eventbus/event_bus_test.go | 184 +++++++++++ .../subscriberframework/eventbus/eventbus.go | 8 +- pkg/infradb/taskmanager/taskmanager_test.go | 288 ++++++++++++++++++ 3 files changed, 478 insertions(+), 2 deletions(-) create mode 100644 pkg/infradb/subscriberframework/eventbus/event_bus_test.go create mode 100644 pkg/infradb/taskmanager/taskmanager_test.go diff --git a/pkg/infradb/subscriberframework/eventbus/event_bus_test.go b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go new file mode 100644 index 00000000..d50418fc --- /dev/null +++ b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go @@ -0,0 +1,184 @@ +package eventbus + +import ( + "log" + "sync" + "testing" + "time" +) + +type moduleCiHandler struct { + receivedEvents []*ObjectData + sync.Mutex +} + +// Constants +const ( + TestEvent = "testEvent" + TestEventpriority = "testEventpriority" + TestEventChBusy = "testEventChBusy" + TestEventUnsub = "testEventUnsub" +) + +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *ObjectData) { + h.Lock() + defer h.Unlock() + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case TestEvent: + case TestEventpriority: + case TestEventChBusy: + case TestEventUnsub: + log.Printf("received event type %s", eventType) + default: + log.Panicf("error: Unknown event type %s", eventType) + } +} + +func TestSubscribeAndPublish(t *testing.T) { + handler := &moduleCiHandler{} + + EventBus := NewEventBus() + EventBus.StartSubscriber("testModule", TestEvent, 1, handler) + time.Sleep(10 * time.Millisecond) + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + + subscribers := EventBus.GetSubscribers(TestEvent) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEvent'") + } + subscriber := subscribers[0] + + err := EventBus.Publish(objectData, subscriber) + if err != nil { + t.Errorf("Publish() failed with error: %v", err) + } + + time.Sleep(10 * time.Millisecond) + handler.Lock() + if len(handler.receivedEvents) != 1 { + t.Errorf("Event was not received by the handler as expected") + } + if handler.receivedEvents[0] != objectData { + t.Errorf("Event data was not received by the handler as expected") + } + handler.Unlock() + + EventBus.Unsubscribe(subscriber) +} + +func TestPriorityOrderWithStartSubscriber(t *testing.T) { + handler1 := &moduleCiHandler{} + handler2 := &moduleCiHandler{} + + EventBus := NewEventBus() + + EventBus.StartSubscriber("testModule1", TestEventpriority, 2, handler1) + EventBus.StartSubscriber("testModule2", TestEventpriority, 1, handler2) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventpriority) + if len(subscribers) != 2 { + t.Errorf("Expected 2 subscribers, got %d", len(subscribers)) + } + if subscribers[0].Priority > subscribers[1].Priority { + t.Errorf("Subscribers are not sorted by priority") + } + + for _, sub := range subscribers { + EventBus.Unsubscribe(sub) + } +} +func TestPublishChannelBusyWithStartSubscriber(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleChBusy", TestEventChBusy, 1, handler) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventChBusy) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventChBusy'") + } + subscriber := subscribers[0] + + subscriber.Ch <- &ObjectData{} + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + err := EventBus.Publish(objectData, subscriber) + if err == nil { + t.Errorf("Expected an error when publishing to a busy channel, but got nil") + } + + EventBus.Unsubscribe(subscriber) +} +func TestUnsubscribeEvent(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.UnsubscribeEvent(subscriber, TestEventUnsub) + + subscribers = EventBus.GetSubscribers(TestEventUnsub) + for _, sub := range subscribers { + if sub == subscriber { + t.Errorf("Subscriber was not successfully unsubscribed") + } + } +} + +func TestUnsubscribe(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.Unsubscribe(subscriber) + + select { + case _, ok := <-subscriber.Ch: + if ok { + t.Errorf("Subscriber's channel should be closed, but it's not") + } + default: + } +} + +func TestSubscriberAlreadyExist(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleSubExist", "testEventSubExist", 3, handler) + + exists := EventBus.subscriberExist("testEventSubExist", "testModuleSubExist") + if !exists { + t.Errorf("subscriberExist should return true for existing subscriber") + } + + subscribers := EventBus.GetSubscribers("testEventSubExist") + for _, sub := range subscribers { + if sub.Name == "testModuleSubExist" { + EventBus.Unsubscribe(sub) + } + } +} diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 72922020..21e9cd1b 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -52,10 +52,13 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e for { select { case event := <-subscriber.Ch: - log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) + log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) handlerKey := moduleName + "." + eventType - if handler, ok := e.eventHandlers[handlerKey]; ok { + e.subscriberL.Lock() + handler, ok := e.eventHandlers[handlerKey] + e.subscriberL.Unlock() + if ok { if objectData, ok := event.(*ObjectData); ok { handler.HandleEvent(eventType, objectData) } else { @@ -65,6 +68,7 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e } else { subscriber.Ch <- "error: no event handler found" } + case <-subscriber.Quit: close(subscriber.Ch) return diff --git a/pkg/infradb/taskmanager/taskmanager_test.go b/pkg/infradb/taskmanager/taskmanager_test.go new file mode 100644 index 00000000..8a2830b8 --- /dev/null +++ b/pkg/infradb/taskmanager/taskmanager_test.go @@ -0,0 +1,288 @@ +package taskmanager + +import ( + "log" + "sync" + "testing" + "time" + + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/common" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" + "github.com/stretchr/testify/assert" +) + +var ( + retValMu sync.Mutex + retVal bool +) + +type moduleCiHandler struct { + receivedEvents []*eventbus.ObjectData +} + +func handleTestEvent(objectData *eventbus.ObjectData) { + name := "testTask" + objectType := "testType" + resourceVersion := "testVersion" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventCompSuccess(objectData *eventbus.ObjectData) { + name := "testTaskCompSuccess" + objectType := "testTypeCompSuccess" + resourceVersion := "testVersionCompSuccess" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventbusy(wg *sync.WaitGroup) { + name := "testTaskbusy" + objectType := "testTypebusy" + resourceVersion := "testVersionbusy" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, "notificationID", dropTask, component) + + retValMu.Lock() + retVal = true + retValMu.Unlock() + + wg.Done() +} +func handletestEventTimeout() { + +} +func handletestNotificationIDNotMatching(objectData *eventbus.ObjectData) { + name := "testTaskNotificationIdNotMatching" + objectType := "testTypeNotificationIdNotMatching" + resourceVersion := "testVersionNotificationIdNotMatching" + dropTask := false + + component := &common.Component{ + Name: "testModuleNotificationIdNotMatching", + CompStatus: common.ComponentStatusSuccess, + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, "NotificationIdNotMatching", dropTask, component) + + time.Sleep(100 * time.Millisecond) + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventError(objectData *eventbus.ObjectData) { + name := "testTaskEventError" + objectType := "testTypeEventError" + resourceVersion := "testVersionEventError" + + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusError, + } + if component.Timer == 0 { + component.Timer = 2 * time.Second + } else { + component.Timer *= 2 + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *eventbus.ObjectData) { + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case "testEvent": + handleTestEvent(objectData) + case "testEventCompSuccess": + handleTestEventCompSuccess(objectData) + case "testEventError": + handleTestEventError(objectData) + case "testEventTimeout": + handletestEventTimeout() + case "testEventNotificationIdNotMatching": + handletestNotificationIDNotMatching(objectData) + default: + log.Printf("LCI: error: Unknown event type %s", eventType) + } +} + +func TestCreateTask(t *testing.T) { + subscriber := &eventbus.Subscriber{ + Name: "testSubscriber", + Ch: make(chan interface{}), + Quit: make(chan bool), + Priority: 1, + } + subs := []*eventbus.Subscriber{subscriber} + tm := newTaskManager() + tm.StartTaskManager() + tm.CreateTask("testTask", "testType", "testVersion", subs) + + time.Sleep(100 * time.Millisecond) + + task := tm.taskQueue.Dequeue() + assert.NotNil(t, task) + assert.Equal(t, "testTask", task.name) + assert.Equal(t, "testType", task.objectType) + assert.Equal(t, "testVersion", task.resourceVersion) + assert.Equal(t, subs, task.subs) +} + +func TestCompSuccess(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + handler := &moduleCiHandler{} + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleCompSuccess", "testEventCompSuccess", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventCompSuccess") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskCompSuccess", "testTypeCompSuccess", "testVersionCompSuccess", subscribers) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTaskCompSuccess" { + t.Errorf("assert failed:") + } + default: + } +} + +func TestCompError(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + + handler := &moduleCiHandler{} + + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleEventError", "testEventError", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventError") + + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventError'") + } + + TaskMan.CreateTask("testTaskEventError", "testTypeEventError", "testVersionEventError", subscribers) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task, "Task should not be nil") + assert.Equal(t, "testTaskEventError", task.name, "Task name should match") + assert.Equal(t, "testTypeEventError", task.objectType, "Task object type should match") + assert.Equal(t, "testVersionEventError", task.resourceVersion, "Task resource version should match") + assert.Equal(t, subscribers, task.subs, "Task subscribers should match") +} + +func TestTimeout(t *testing.T) { + var wg sync.WaitGroup + + handler := &moduleCiHandler{} + TaskMan.StartTaskManager() + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleTimeout", "testEventTimeout", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + subscribers := eventbus.EBus.GetSubscribers("testEventTimeout") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventTimeout'") + } + TaskMan.CreateTask("testTaskTimeout", "testTypeTimeout", "testVersionTimeout", subscribers) + + time.Sleep(35 * time.Second) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task) + assert.Equal(t, "testTaskTimeout", task.name) + assert.Equal(t, "testTypeTimeout", task.objectType) + assert.Equal(t, "testVersionTimeout", task.resourceVersion) + assert.Equal(t, subscribers, task.subs) +} + +func TestNotificationIdNotMatching(t *testing.T) { + var wg sync.WaitGroup + + // Start the TaskManager in a separate goroutine + wg.Add(1) + go func() { + defer wg.Done() + TaskMan.StartTaskManager() + }() + + handler := &moduleCiHandler{} + // Start the subscriber and wait for it to be ready + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleNotificationIdNotMatching", "testEventNotificationIdNotMatching", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + + for i := 0; i < cap(TaskMan.taskStatusChan); i++ { + TaskMan.taskStatusChan <- &TaskStatus{} + } + + subscribers := eventbus.EBus.GetSubscribers("testEventNotificationIdNotMatching") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskNotificationIdNotMatching", "testTypeNotificationIdNotMatching", "testVersionNotificationIdNotMatching", subscribers) + + time.Sleep(500 * time.Millisecond) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTask" { + t.Errorf("assert failed:") + } + default: + } + + for len(TaskMan.taskStatusChan) > 0 { + <-TaskMan.taskStatusChan + } +} + +func TestStatusUpdatedChannelNotAvailable(t *testing.T) { + var wg sync.WaitGroup + + wg.Add(1) + + go handleTestEventbusy(&wg) + + wg.Wait() + actualRetVal := retVal + + assert.Equal(t, true, actualRetVal) +}