Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stop committee runner queue consumption trigger #1436

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ba61a84
added stop committee runner queue consumtion when round timeout is re…
AKorpusenko Jun 19, 2024
3b18674
minor improvements and duty runner delete when not needed
AKorpusenko Jun 19, 2024
eac3b03
removed deleting committee runner on timeout
AKorpusenko Jun 19, 2024
ed0c8c0
added return nil instead of break in loop
AKorpusenko Jun 24, 2024
0ad335e
added return nil instead of break in loop
AKorpusenko Jun 24, 2024
6cdc714
Merge branch 'alan/no-fork' of github.com:ssvlabs/ssv into alan/no-fo…
AKorpusenko Jun 24, 2024
38ba102
Merge branch 'alan/no-fork-add-consume-queue-exit' of github.com:ssvl…
AKorpusenko Jun 24, 2024
fbd352e
removed redundant select
AKorpusenko Jun 24, 2024
a576164
reverted breaks replacing with return nil
AKorpusenko Jun 24, 2024
91fbaef
moved the defer cancel in func beginning
AKorpusenko Jun 24, 2024
014a29d
added stop queue for committee runnners queeus on no validators event
AKorpusenko Jun 24, 2024
1e771fe
added mtx unlock before cancelF call
AKorpusenko Jun 25, 2024
76e25eb
fixed concurrency issues
AKorpusenko Jun 25, 2024
a3b46b1
renamed StopQueues -> Stop. Added queue existence check
AKorpusenko Jun 25, 2024
3d5ae0c
Merge branch 'alan/no-fork' of github.com:ssvlabs/ssv into alan/no-fo…
AKorpusenko Jun 26, 2024
810cfe0
Updated comment
AKorpusenko Jun 26, 2024
86b9fde
Merge branch 'stage' of github.com:ssvlabs/ssv into alan/no-fork-add-…
AKorpusenko Jul 29, 2024
e1e62db
Merge branch 'alan/no-fork-add-consume-queue-exit' of github.com:ssvl…
AKorpusenko Jul 29, 2024
8b3c9c7
added presence check
AKorpusenko Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ func (c *controller) onShareStop(pubKey spectypes.ValidatorPK) {
if ok {
vc.RemoveShare(v.Share.Share.ValidatorIndex)
if len(vc.Shares) == 0 {
vc.Stop()
deletedCommittee := c.validatorsMap.RemoveCommittee(v.Share.CommitteeID())
if deletedCommittee == nil {
c.logger.Warn("could not find committee to remove on no validators",
Expand All @@ -872,7 +873,6 @@ func (c *controller) onShareStop(pubKey spectypes.ValidatorPK) {
)
return
}
// TODO: (Alan) stop committee runners queues consumption
}
}
}
Expand Down Expand Up @@ -932,9 +932,9 @@ func (c *controller) onShareInit(share *ssvtypes.SSVShare) (*validator.Validator
zap.String("committee_id", hex.EncodeToString(operator.ClusterID[:])),
}...)

committeRunnerFunc := SetupCommitteeRunners(ctx, logger, opts)
committeeRunnerFunc := SetupCommitteeRunners(ctx, logger, opts)

vc = validator.NewCommittee(c.context, logger, c.beacon.GetBeaconNetwork(), operator, opts.SignatureVerifier, committeRunnerFunc)
vc = validator.NewCommittee(c.context, logger, c.beacon.GetBeaconNetwork(), operator, opts.SignatureVerifier, committeeRunnerFunc)
vc.AddShare(&share.Share)
c.validatorsMap.PutCommittee(operator.ClusterID, vc)

Expand Down
32 changes: 30 additions & 2 deletions protocol/v2/ssv/validator/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty)
// Set timeout function.
r.GetBaseRunner().TimeoutF = c.onTimeout
c.Runners[duty.Slot] = r

// required to stop the queue consumer when timeout message is received by handler
queueCtx, cancelF := context.WithCancel(c.ctx)

if _, ok := c.Queues[duty.Slot]; !ok {
c.Queues[duty.Slot] = queueContainer{
Q: queue.WithMetrics(queue.New(1000), nil), // TODO alan: get queue opts from options
Expand All @@ -131,9 +135,17 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty)

}

// Setting the cancel function separately due the queue could be created in HandleMessage
q, _ := c.Queues[duty.Slot]
q.StopQueueF = cancelF

logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot))
// TODO alan: stop queue
go c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage)
go func() {
err := c.ConsumeQueue(queueCtx, q, logger, duty.Slot, c.ProcessMessage)
if err != nil {
logger.Error("❗ failed committee queue consumption", zap.Error(err))
}
}()

logger.Info("ℹ️ starting duty processing")
return c.Runners[duty.Slot].StartNewDuty(logger, duty)
Expand Down Expand Up @@ -293,6 +305,22 @@ func (c *Committee) ProcessMessage(logger *zap.Logger, msg *queue.DecodedSSVMess

}

func (c *Committee) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()

for slot, q := range c.Queues {
if q.StopQueueF == nil {
c.logger.Error("⚠️ can't stop committee queue StopQueueF is nil",
fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(slot), slot)),
fields.Slot(slot),
)
continue
}
q.StopQueueF()
}
}

// updateAttestingSlotMap updates the highest attesting slot map from beacon duties
func (c *Committee) updateAttestingSlotMap(duty *spectypes.CommitteeDuty) {
for _, beaconDuty := range duty.BeaconDuties {
Expand Down
31 changes: 13 additions & 18 deletions protocol/v2/ssv/validator/committee_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package validator

import (
"context"
"fmt"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
Expand Down Expand Up @@ -72,24 +70,21 @@ func (v *Committee) HandleMessage(logger *zap.Logger, msg *queue.DecodedSSVMessa

// ConsumeQueue consumes messages from the queue.Queue of the controller
// it checks for current state
func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler MessageHandler) error {
ctx, cancel := context.WithCancel(v.ctx)
defer cancel()

var q queueContainer
err := func() error {
v.mtx.RLock() // read v.Queues
defer v.mtx.RUnlock()
var ok bool
q, ok = v.Queues[slot]
if !ok {
return errors.New(fmt.Sprintf("queue not found for slot %d", slot))
func (v *Committee) ConsumeQueue(
ctx context.Context,
q queueContainer,
logger *zap.Logger,
slot phase0.Slot,
handler MessageHandler,
) error {
// in case of any error try to call the ctx.cancel to prevent the ctx leak
defer func() {
if q.StopQueueF == nil {
logger.Error("⚠️ committee queue consumer StopQueueF is nil", fields.Slot(slot))
return
}
return nil
q.StopQueueF()
}()
if err != nil {
return err
}

logger.Debug("📬 queue consumer is running")

Expand Down
13 changes: 12 additions & 1 deletion protocol/v2/ssv/validator/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package validator
import (
"fmt"

"github.com/ssvlabs/ssv/logging/fields"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/protocol/v2/ssv/queue"
Expand Down Expand Up @@ -42,13 +43,23 @@ func (c *Committee) handleEventMessage(logger *zap.Logger, msg *queue.DecodedSSV
if err != nil {
return err
}

c.mtx.Lock()
dutyRunner := c.Runners[slot] // TODO: err check , runner exist?
q, qExists := c.Queues[slot]
dutyRunner, rExists := c.Runners[slot]
c.mtx.Unlock()

if !rExists || !qExists {
logger.Error("no committee runner found for slot", fields.Slot(slot), fields.MessageID(msg.MsgID))
AKorpusenko marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(logger, *eventMsg); err != nil {
return fmt.Errorf("timeout event: %w", err)
}

q.StopQueueF()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we stopping the queue on timeout message?


return nil
case types.ExecuteDuty:
if err := c.OnExecuteDuty(logger, eventMsg); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions protocol/v2/ssv/validator/msgqueue_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type MessageHandler func(logger *zap.Logger, msg *queue.DecodedSSVMessage) error
// queueContainer wraps a queue with its corresponding state
type queueContainer struct {
Q queue.Queue
StopQueueF context.CancelFunc

queueState *queue.State
}

Expand Down
Loading