Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(evpn-bridge): fix system behaviour for pending objects #391

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package eventbus

import (
"fmt"
"log"
"sort"
"sync"
Expand Down Expand Up @@ -89,7 +90,7 @@

subscriber := &Subscriber{
Name: moduleName,
Ch: make(chan interface{}, 1),
Ch: make(chan interface{}),
Quit: make(chan bool),
Priority: priority,
}
Expand Down Expand Up @@ -128,10 +129,20 @@
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {

Check warning on line 132 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L132

Added line #L132 was not covered by tests
e.publishL.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... if write into a channel, why do we use read lock here? With the read lock many threads can publish at the same time. We apparently want to use Lock here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. The question though is if we actually need a Lock.

In a case where two goroutines try to write on the same channel I think golang is smart enough to block the second goroutine until the first one has finished writing. In that case we do not need any Lock. is that correct ?

WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we need the lock only to lock on the channel, then we can remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think the Lock is not needed here we can remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

defer e.publishL.RUnlock()
subscriber.Ch <- objectData
var err error
// We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel
artek-koltun marked this conversation as resolved.
Show resolved Hide resolved
// and the Publish function will stuck. So the default case serves exctly this purpose.
select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name)
err = fmt.Errorf("channel is busy")

Check warning on line 143 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L135-L143

Added lines #L135 - L143 were not covered by tests
artek-koltun marked this conversation as resolved.
Show resolved Hide resolved
}
return err

Check warning on line 145 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L145

Added line #L145 was not covered by tests
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
Expand Down
66 changes: 49 additions & 17 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
objectType string
resourceVersion string
subIndex int
retryTimer time.Duration
subs []*eventbus.Subscriber
// systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus
systemTimer time.Duration
subs []*eventbus.Subscriber
}

// TaskStatus holds info related to the status that has been received
Expand Down Expand Up @@ -60,6 +61,7 @@
objectType: objectType,
resourceVersion: resourceVersion,
subIndex: 0,
systemTimer: 1 * time.Second,
subs: subs,
}
}
Expand Down Expand Up @@ -94,13 +96,18 @@
// StatusUpdated creates a task status and sends it for handling
func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) {
taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component)

// Do we need to make this call happen in a goroutine in order to not make the
// StatusUpdated function stuck in case that nobody reads what is written on the channel ?
// Is there any case where this can happen
// (nobody reads what is written on the channel and the StatusUpdated gets stuck) ?
t.taskStatusChan <- taskStatus
log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus)
log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus)

// We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel.
// e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile
// the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet
// as the timer has not expired and the queue is empty (We assume that there is only one task in the queue).
select {
case t.taskStatusChan <- taskStatus:
log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus)
default:
log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus)

Check warning on line 109 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L99-L109

Added lines #L99 - L109 were not covered by tests
}
}

// processTasks processes the task
Expand All @@ -123,7 +130,18 @@
// (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed)
NotificationID: uuid.NewString(),
}
eventbus.EBus.Publish(objectData, sub)
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are at subscriber with index i at the moment
if you use += here and you already have subIndex as non-zero, won't you end up with a wrong index to start with next time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this will not happen as the next time we will take a sublist of the subscribers based on the subIndex. Then we will iterate on that sublist and that means that the i will start from 0 again.

check here: https://github.com/opiproject/opi-evpn-bridge/blob/main/pkg/infradb/taskmanager/taskmanager.go#L114

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds a bit error-prone
What if we, instead of index calculations, copy the rest of subscribers into a task, so it could continue where it stopped?
Any other means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I like the solution as it is today. It looks cleaner to me. It is tested and it works so nice so I would not like to change it if that is ok with you. I can put a comment if you want so people can understand what this index calculation is all about. I do not like so much to be honest to keep sublists all the time for the remaining subscribers to me that is a bit more error prone. Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining

Copy link
Contributor

@artek-koltun artek-koltun Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it is not. I already misinterpreted what is happening here. We could create an issue and discuss it there. I don't know if we can add a Subscriber in the middle and if we need to regard that a new one and other details

I can put a comment if you want so people can understand what this index calculation is all about.

Please do. I am also wondering if it could be a single place like a function to "re-queue task"

Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining

Please create an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

#393

task.systemTimer *= 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • maybe the name should be smth like waitForRetry?
  • Will we stop increasing timer if no one is listening for the published task on another end?

Copy link
Contributor Author

@mardim91 mardim91 Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I would like to keep it as a system timer as it is more explicit if you compare it also with the Component.Timer which is provided by each component in case of Error :
    time.AfterFunc(taskStatus.component.Timer, func() {
  • The timer will increase every time that we try to publish but the channel is busy and we need to reque the task. So it is working this way:
  1. We publish
  2. An error is returned because the channel is busy
  3. We increase the timer
  4. We wait for the timer to expire in a goroutine and then we requeue the task

Copy link
Contributor

@artek-koltun artek-koltun Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow you described is clear. I want to know if we ever stop increasing it. Will it make sense to wait for hours? days? years?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we never stop increasing it. But this is a known issue that we have just not addressed do far. We are planning to address this in the future. I can open an issue to track it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls create an issue

Copy link
Contributor Author

@mardim91 mardim91 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
#394

log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo

Check warning on line 143 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L133-L143

Added lines #L133 - L143 were not covered by tests
}
log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData)

loopThree:
Expand All @@ -143,11 +161,17 @@
log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID)

// We need a timeout in case that the subscriber doesn't update the status at all for whatever reason.
// If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer
// If that occurs then we just requeue the task with a timer
case <-time.After(30 * time.Second):
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus)
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.

Check warning on line 168 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L166-L168

Added lines #L166 - L168 were not covered by tests
task.subIndex += i
go t.taskQueue.Enqueue(task)
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if we do not receive an answer within reasonable time, does it mean that we have a problem and probably no one is handling our requests? and the best we can do is report an error and gracefully shutdown?

  2. If we block for 30 seconds in this thread, can we handle other task responses? If we are blocked for 30 seconds, doesn't it mean that what you actually want is to wait for a response in another goroutine which could continue handling when the response is received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is not built for that. What we want here is that when we do not receive any answer because for whatever reason the subsciber is stuck we would like to requeue the task and try again because maybe the unresponsivness of the subscriber is temporary. Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.
  2. We will not have multiple subscribers sending multiple responses at the same time. The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber. You will not hve a case where we send in parallel multipe tasks to multiple subscribers and the subscribers will send in the same time multiple respnses back in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.

Issue is needed

The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber.

Sounds like a sequential flow. Do we need channels then? Pls consider. It would make all the things much simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think channels and notifications make the whole system scale better. The implementation that we have so far it just uses one task queue and one process that drains the queue. Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications. So I would prefer keeping it this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications.

But we didn't hit, and we don't know if we really need it in the future, but already complicating the design.
If we need to scale, there are likely better approaches than this sequential one.
If we don't need to scale, then we are complicating the system for nothing at the moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I didn't get your question correct at the beggining. I apologize for this

The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications. The plugins are responsible to configuring the underline system of their responsibility (e.g. FRRModule for FRR, LinuxModule for Linux etc...). The core system doesn't need to know what are those plugins or what they exactly do but the only thing that they need to know is towards which subscribers they need to send notifications. The channels help in this agnostic notion as well as to the communication between the different go routines as each subscriber runs as a different go routine.

Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one. Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.

This is a significant design choice which has been presented to the TSC before the implementation and we have decided that was resonable to move forward with it . Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.

Copy link
Contributor

@artek-koltun artek-koltun Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications.

The same level of agnosticism can be achieved with a function calling. The channels do not add more agnosticism here.
By the definition: Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.

as well as to the communication between the different go routines as each subscriber runs as a different go routine.

They can be run in different routines, even if we call them by a function (you can use mutex to sync or send to channel or execute right in the same thread context)

Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one.

We send a msg, and go to sleep waiting for a plugin to complete its job transforming our flow into sequential

Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.

What if we do not register General Linux Module at all so it won't receive notifications at all?

Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.

It might be. But from the chunk I see, it looks like you need a sequential flow and channels add complexity.

Apparently this PR is not the place to make such decisions, but pls consider

})

Check warning on line 174 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L170-L174

Added lines #L170 - L174 were not covered by tests
break loopThree
}
}
Expand All @@ -159,19 +183,27 @@
break loopTwo
}

// We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded
task.systemTimer = 1 * time.Second

Check warning on line 188 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L187-L188

Added lines #L187 - L188 were not covered by tests
switch taskStatus.component.CompStatus {
case common.ComponentStatusSuccess:
log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task)
continue loopTwo
default:
case common.ComponentStatusError:

Check warning on line 193 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L193

Added line #L193 was not covered by tests
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.

Check warning on line 197 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L195-L197

Added lines #L195 - L197 were not covered by tests
task.subIndex += i
task.retryTimer = taskStatus.component.Timer
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer)
time.AfterFunc(task.retryTimer, func() {
time.AfterFunc(taskStatus.component.Timer, func() {

Check warning on line 199 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L199

Added line #L199 was not covered by tests
t.taskQueue.Enqueue(task)
})
break loopTwo
default:
artek-koltun marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task)
log.Printf("processTasks(): The task %+v will be dropped\n", task)
break loopTwo

Check warning on line 206 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}
}
}
Expand Down
Loading