From d581d3f944ade99a10c99a76ccdd95983bf8cb6f Mon Sep 17 00:00:00 2001 From: Dimitrios Markou Date: Wed, 17 Jul 2024 14:43:34 +0000 Subject: [PATCH] fix(evpn-bridge): first review Signed-off-by: Dimitrios Markou --- pkg/infradb/subscriberframework/eventbus/eventbus.go | 10 ++++------ pkg/infradb/taskmanager/taskmanager.go | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index ea43e320..4e75a477 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -130,17 +130,15 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { // Publish api notifies the subscribers with certain eventType func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { - e.publishL.RLock() - defer e.publishL.RUnlock() + e.publishL.Lock() + defer e.publishL.Unlock() var err error - // We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel - // and the Publish function will stuck. So the default case serves exctly this purpose. + select { case subscriber.Ch <- objectData: log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name) default: - log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name) - err = fmt.Errorf("channel is busy") + err = fmt.Errorf("channel for subscriber %s is busy.", subscriber.Name) } return err } diff --git a/pkg/infradb/taskmanager/taskmanager.go b/pkg/infradb/taskmanager/taskmanager.go index ce9446d1..b290bb09 100644 --- a/pkg/infradb/taskmanager/taskmanager.go +++ b/pkg/infradb/taskmanager/taskmanager.go @@ -131,7 +131,8 @@ func (t *TaskManager) processTasks() { NotificationID: uuid.NewString(), } if err := eventbus.EBus.Publish(objectData, sub); err != nil { - log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task) + log.Printf("processTasks(): Failed to sent notification: %+v\n", err) + log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task) // We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task // so we do start again from the subscriber that returned an error or was unavailable for any reason. task.subIndex += i