-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added stop committee runner queue consumption trigger #1436
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a context when you create the queue and pass it in.
…rk-add-consume-queue-exit
…abs/ssv into alan/no-fork-add-consume-queue-exit
var q queueContainer | ||
|
||
// in case of any error try to call the ctx.cancel to prevent the ctx leak | ||
defer func() { | ||
var ok bool | ||
|
||
v.mtx.Lock() | ||
q, ok = v.Queues[slot] | ||
v.mtx.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StartDuty
is already locking, so this is potential "deadlock" or just double locking, although this happens in a goroutine and the lock is released in start duty. Still I think its better to just pass the Queue q
from StartDuty
as we're already using the mutex to get it there.
by passing it to this function you don't need to use the mutex here at all.
operator/validator/controller.go
Outdated
for _, slotQueue := range vc.Queues { | ||
slotQueue.StopQueueF() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't do this here, accessing Queues requires mutex acquiring. best is to have a function inside the committee struct that handles this.
protocol/v2/ssv/validator/events.go
Outdated
|
||
if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(logger, *eventMsg); err != nil { | ||
return fmt.Errorf("timeout event: %w", err) | ||
} | ||
|
||
c.Queues[slot].StopQueueF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to lock the mutex to access Queues
. the best will be to get it same time you're getting the runner at the beginning of function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concurrency issues..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small change
…rk-add-consume-queue-exit
Co-authored-by: Matus Kysel <[email protected]>
…consume-queue-exit
…abs/ssv into alan/no-fork-add-consume-queue-exit
err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, r) | ||
// Setting the cancel function separately due the queue could be created in HandleMessage | ||
|
||
logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in the other PR for committee ID no?
if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(logger, *eventMsg); err != nil { | ||
return fmt.Errorf("timeout event: %w", err) | ||
} | ||
|
||
q.StopQueueF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we stopping the queue on timeout message?
Replaced with PR #1556 based on the |
No description provided.