From 1c6fb598cce832afc239e6531f9dc96ec188a231 Mon Sep 17 00:00:00 2001 From: oleg-ssvlabs Date: Tue, 3 Dec 2024 11:37:31 +0100 Subject: [PATCH] Observability - instrument QBFT component, ctx propagation --- protocol/v2/qbft/controller/controller.go | 10 +-- .../v2/qbft/controller/controller_test.go | 5 +- protocol/v2/qbft/controller/timer.go | 6 +- protocol/v2/qbft/instance/commit.go | 5 +- protocol/v2/qbft/instance/instance.go | 19 ++--- protocol/v2/qbft/instance/metrics.go | 79 +++++++------------ protocol/v2/qbft/instance/observability.go | 53 +++++++++++++ protocol/v2/qbft/instance/prepare.go | 5 +- protocol/v2/qbft/instance/proposal.go | 7 +- protocol/v2/qbft/instance/round_change.go | 8 +- protocol/v2/qbft/instance/timeout.go | 6 +- protocol/v2/qbft/spectest/controller_type.go | 2 +- .../v2/qbft/spectest/msg_processing_type.go | 3 +- protocol/v2/qbft/spectest/timeout_type.go | 3 +- protocol/v2/ssv/runner/aggregator.go | 2 +- protocol/v2/ssv/runner/committee.go | 2 +- protocol/v2/ssv/runner/proposer.go | 2 +- protocol/v2/ssv/runner/runner.go | 6 +- .../ssv/runner/sync_committee_aggregator.go | 2 +- protocol/v2/ssv/validator/events.go | 4 +- 20 files changed, 139 insertions(+), 90 deletions(-) create mode 100644 protocol/v2/qbft/instance/observability.go diff --git a/protocol/v2/qbft/controller/controller.go b/protocol/v2/qbft/controller/controller.go index 9cd1323264..cad2459d5d 100644 --- a/protocol/v2/qbft/controller/controller.go +++ b/protocol/v2/qbft/controller/controller.go @@ -54,7 +54,7 @@ func NewController( } // StartNewInstance will start a new QBFT instance, if can't will return error -func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height, value []byte) error { +func (c *Controller) StartNewInstance(ctx context.Context, logger *zap.Logger, height specqbft.Height, value []byte) error { if err := c.GetConfig().GetValueCheckF()(value); err != nil { return errors.Wrap(err, "value invalid") @@ -71,7 +71,7 @@ func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height c.Height = height newInstance := c.addAndStoreNewInstance() - newInstance.Start(logger, value, height) + newInstance.Start(ctx, logger, value, height) c.forceStopAllInstanceExceptCurrent() return nil } @@ -118,10 +118,10 @@ func (c *Controller) ProcessMsg(ctx context.Context, logger *zap.Logger, signedM return nil, fmt.Errorf("future msg from height, could not process") } - return c.UponExistingInstanceMsg(logger, msg) + return c.UponExistingInstanceMsg(ctx, logger, msg) } -func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.ProcessingMessage) (*spectypes.SignedSSVMessage, error) { +func (c *Controller) UponExistingInstanceMsg(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage) (*spectypes.SignedSSVMessage, error) { inst := c.StoredInstances.FindInstance(msg.QBFTMessage.Height) if inst == nil { return nil, errors.New("instance not found") @@ -134,7 +134,7 @@ func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.P return nil, errors.New("not processing consensus message since instance is already decided") } - decided, _, decidedMsg, err := inst.ProcessMsg(logger, msg) + decided, _, decidedMsg, err := inst.ProcessMsg(ctx, logger, msg) if err != nil { return nil, errors.Wrap(err, "could not process msg") } diff --git a/protocol/v2/qbft/controller/controller_test.go b/protocol/v2/qbft/controller/controller_test.go index 75a84bc362..8aa6c8d08f 100644 --- a/protocol/v2/qbft/controller/controller_test.go +++ b/protocol/v2/qbft/controller/controller_test.go @@ -1,6 +1,7 @@ package controller import ( + "context" "encoding/json" "testing" @@ -82,7 +83,7 @@ func TestController_OnTimeoutWithRoundCheck(t *testing.T) { contr.StoredInstances.addNewInstance(inst) // Call OnTimeout and capture the error - err = contr.OnTimeout(logger, *msg) + err = contr.OnTimeout(context.TODO(), logger, *msg) // Assert that the error is nil and the round did not bump require.NoError(t, err) @@ -92,7 +93,7 @@ func TestController_OnTimeoutWithRoundCheck(t *testing.T) { inst.State.Round = specqbft.FirstRound // Call OnTimeout and capture the error - err = contr.OnTimeout(logger, *msg) + err = contr.OnTimeout(context.TODO(), logger, *msg) // Assert that the error is nil and the round did bump require.NoError(t, err) diff --git a/protocol/v2/qbft/controller/timer.go b/protocol/v2/qbft/controller/timer.go index fb6af7a196..5f7992906b 100644 --- a/protocol/v2/qbft/controller/timer.go +++ b/protocol/v2/qbft/controller/timer.go @@ -1,6 +1,8 @@ package controller import ( + "context" + "github.com/pkg/errors" "go.uber.org/zap" @@ -8,7 +10,7 @@ import ( ) // OnTimeout is trigger upon timeout for the given height -func (c *Controller) OnTimeout(logger *zap.Logger, msg types.EventMsg) error { +func (c *Controller) OnTimeout(ctx context.Context, logger *zap.Logger, msg types.EventMsg) error { // TODO add validation timeoutData, err := msg.GetTimeoutData() @@ -28,5 +30,5 @@ func (c *Controller) OnTimeout(logger *zap.Logger, msg types.EventMsg) error { if decided, _ := instance.IsDecided(); decided { return nil } - return instance.UponRoundTimeout(logger) + return instance.UponRoundTimeout(ctx, logger) } diff --git a/protocol/v2/qbft/instance/commit.go b/protocol/v2/qbft/instance/commit.go index 2b905ea249..e81757f1be 100644 --- a/protocol/v2/qbft/instance/commit.go +++ b/protocol/v2/qbft/instance/commit.go @@ -2,6 +2,7 @@ package instance import ( "bytes" + "context" "sort" "github.com/pkg/errors" @@ -16,7 +17,7 @@ import ( // UponCommit returns true if a quorum of commit messages was received. // Assumes commit message is valid! -func (i *Instance) UponCommit(logger *zap.Logger, msg *specqbft.ProcessingMessage, commitMsgContainer *specqbft.MsgContainer) (bool, []byte, *spectypes.SignedSSVMessage, error) { +func (i *Instance) UponCommit(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, commitMsgContainer *specqbft.MsgContainer) (bool, []byte, *spectypes.SignedSSVMessage, error) { logger.Debug("📬 got commit message", fields.Round(i.State.Round), zap.Any("commit_signers", msg.SignedMessage.OperatorIDs), @@ -49,7 +50,7 @@ func (i *Instance) UponCommit(logger *zap.Logger, msg *specqbft.ProcessingMessag zap.Any("agg_signers", agg.OperatorIDs), fields.Root(msg.QBFTMessage.Root)) - i.metrics.EndStageCommit() + i.metrics.EndStageCommit(ctx) return true, fullData, agg, nil } diff --git a/protocol/v2/qbft/instance/instance.go b/protocol/v2/qbft/instance/instance.go index 67d688d32c..b81cbab42e 100644 --- a/protocol/v2/qbft/instance/instance.go +++ b/protocol/v2/qbft/instance/instance.go @@ -1,6 +1,7 @@ package instance import ( + "context" "encoding/base64" "encoding/json" "sync" @@ -69,10 +70,10 @@ func (i *Instance) ForceStop() { } // Start is an interface implementation -func (i *Instance) Start(logger *zap.Logger, value []byte, height specqbft.Height) { +func (i *Instance) Start(ctx context.Context, logger *zap.Logger, value []byte, height specqbft.Height) { i.startOnce.Do(func() { i.StartValue = value - i.bumpToRound(specqbft.FirstRound) + i.bumpToRound(ctx, specqbft.FirstRound) i.State.Height = height i.metrics.StartStage() i.config.GetTimer().TimeoutForRound(height, specqbft.FirstRound) @@ -126,7 +127,7 @@ func allSigners(all []*specqbft.ProcessingMessage) []spectypes.OperatorID { } // ProcessMsg processes a new QBFT msg, returns non nil error on msg processing error -func (i *Instance) ProcessMsg(logger *zap.Logger, msg *specqbft.ProcessingMessage) (decided bool, decidedValue []byte, aggregatedCommit *spectypes.SignedSSVMessage, err error) { +func (i *Instance) ProcessMsg(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage) (decided bool, decidedValue []byte, aggregatedCommit *spectypes.SignedSSVMessage, err error) { if !i.CanProcessMessages() { return false, nil, nil, errors.New("instance stopped processing messages") } @@ -139,18 +140,18 @@ func (i *Instance) ProcessMsg(logger *zap.Logger, msg *specqbft.ProcessingMessag switch msg.QBFTMessage.MsgType { case specqbft.ProposalMsgType: - return i.uponProposal(logger, msg, i.State.ProposeContainer) + return i.uponProposal(ctx, logger, msg, i.State.ProposeContainer) case specqbft.PrepareMsgType: - return i.uponPrepare(logger, msg, i.State.PrepareContainer) + return i.uponPrepare(ctx, logger, msg, i.State.PrepareContainer) case specqbft.CommitMsgType: - decided, decidedValue, aggregatedCommit, err = i.UponCommit(logger, msg, i.State.CommitContainer) + decided, decidedValue, aggregatedCommit, err = i.UponCommit(ctx, logger, msg, i.State.CommitContainer) if decided { i.State.Decided = decided i.State.DecidedValue = decidedValue } return err case specqbft.RoundChangeMsgType: - return i.uponRoundChange(logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF()) + return i.uponRoundChange(ctx, logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF()) default: return errors.New("signed message type not supported") } @@ -249,9 +250,9 @@ func (i *Instance) Decode(data []byte) error { } // bumpToRound sets round and sends current round metrics. -func (i *Instance) bumpToRound(round specqbft.Round) { +func (i *Instance) bumpToRound(ctx context.Context, round specqbft.Round) { i.State.Round = round - i.metrics.SetRound(round) + i.metrics.SetRound(ctx, round) } // CanProcessMessages will return true if instance can process messages diff --git a/protocol/v2/qbft/instance/metrics.go b/protocol/v2/qbft/instance/metrics.go index 5d772b4910..cc590c68e0 100644 --- a/protocol/v2/qbft/instance/metrics.go +++ b/protocol/v2/qbft/instance/metrics.go @@ -1,75 +1,56 @@ package instance import ( + "context" + "math" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" specqbft "github.com/ssvlabs/ssv-spec/qbft" - "go.uber.org/zap" + "go.opentelemetry.io/otel/metric" ) -var ( - metricsStageDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "ssv_validator_instance_stage_duration_seconds", - Help: "Instance stage duration (seconds)", - Buckets: []float64{0.02, 0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 5}, - }, []string{"stage"}) - metricsRound = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ssv_qbft_instance_round", - Help: "QBFT instance round", - }, []string{"roleType"}) -) - -func init() { - allMetrics := []prometheus.Collector{ - metricsStageDuration, - metricsRound, - } - logger := zap.L() - for _, c := range allMetrics { - if err := prometheus.Register(c); err != nil { - logger.Debug("could not register prometheus collector") - } - } -} - type metrics struct { - StageStart time.Time - proposalDuration prometheus.Observer - prepareDuration prometheus.Observer - commitDuration prometheus.Observer - round prometheus.Gauge + stageStart time.Time + role string } func newMetrics(role string) *metrics { return &metrics{ - proposalDuration: metricsStageDuration.WithLabelValues("proposal"), - prepareDuration: metricsStageDuration.WithLabelValues("prepare"), - commitDuration: metricsStageDuration.WithLabelValues("commit"), - round: metricsRound.WithLabelValues(role), + role: role, } } func (m *metrics) StartStage() { - m.StageStart = time.Now() + m.stageStart = time.Now() } -func (m *metrics) EndStageProposal() { - m.proposalDuration.Observe(time.Since(m.StageStart).Seconds()) - m.StageStart = time.Now() +func (m *metrics) EndStageProposal(ctx context.Context) { + validatorStageDurationHistogram.Record( + ctx, + time.Since(m.stageStart).Seconds(), + metric.WithAttributes(stageAttribute(proposalStage))) + m.stageStart = time.Now() } -func (m *metrics) EndStagePrepare() { - m.prepareDuration.Observe(time.Since(m.StageStart).Seconds()) - m.StageStart = time.Now() +func (m *metrics) EndStagePrepare(ctx context.Context) { + validatorStageDurationHistogram.Record( + ctx, + time.Since(m.stageStart).Seconds(), + metric.WithAttributes(stageAttribute(prepareStage))) + m.stageStart = time.Now() } -func (m *metrics) EndStageCommit() { - m.commitDuration.Observe(time.Since(m.StageStart).Seconds()) - m.StageStart = time.Now() +func (m *metrics) EndStageCommit(ctx context.Context) { + validatorStageDurationHistogram.Record( + ctx, + time.Since(m.stageStart).Seconds(), + metric.WithAttributes(stageAttribute(commitStage))) + m.stageStart = time.Now() } -func (m *metrics) SetRound(round specqbft.Round) { - m.round.Set(float64(round)) +func (m *metrics) SetRound(ctx context.Context, round specqbft.Round) { + convertedRound := uint64(round) + if convertedRound <= math.MaxInt64 { + roundGauge.Record(ctx, int64(convertedRound), metric.WithAttributes(roleAttribute(m.role))) + } } diff --git a/protocol/v2/qbft/instance/observability.go b/protocol/v2/qbft/instance/observability.go new file mode 100644 index 0000000000..273fed747b --- /dev/null +++ b/protocol/v2/qbft/instance/observability.go @@ -0,0 +1,53 @@ +package instance + +import ( + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/ssvlabs/ssv/observability" +) + +const ( + observabilityName = "github.com/ssvlabs/ssv/protocol/v2/qbft" + observabilityNamespace = "ssv.validator" +) + +type stage string + +const ( + proposalStage stage = "proposal" + prepareStage stage = "prepare" + commitStage stage = "commit" +) + +var ( + meter = otel.Meter(observabilityName) + + validatorStageDurationHistogram = observability.NewMetric( + meter.Float64Histogram( + metricName("stage.duration"), + metric.WithUnit("s"), + metric.WithDescription("validator stage(proposal, prepare, commit) duration"), + metric.WithExplicitBucketBoundaries(observability.SecondsHistogramBuckets...))) + + roundGauge = observability.NewMetric( + meter.Int64Gauge( + metricName("round"), + metric.WithUnit("{round}"), + metric.WithDescription("QBFT instance round"))) +) + +func metricName(name string) string { + return fmt.Sprintf("%s.%s", observabilityNamespace, name) +} + +func stageAttribute(stage stage) attribute.KeyValue { + return attribute.String("ssv.validator.stage", string(stage)) +} + +func roleAttribute(role string) attribute.KeyValue { + return attribute.String("ssv.runner.role", role) +} diff --git a/protocol/v2/qbft/instance/prepare.go b/protocol/v2/qbft/instance/prepare.go index 2919c1616a..1402709d60 100644 --- a/protocol/v2/qbft/instance/prepare.go +++ b/protocol/v2/qbft/instance/prepare.go @@ -2,6 +2,7 @@ package instance import ( "bytes" + "context" "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" @@ -15,7 +16,7 @@ import ( // uponPrepare process prepare message // Assumes prepare message is valid! -func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessage, prepareMsgContainer *specqbft.MsgContainer) error { +func (i *Instance) uponPrepare(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, prepareMsgContainer *specqbft.MsgContainer) error { hasQuorumBefore := specqbft.HasQuorum(i.State.CommitteeMember, prepareMsgContainer.MessagesForRound(i.State.Round)) addedMsg, err := prepareMsgContainer.AddFirstMsgForSignerAndRound(msg) @@ -43,7 +44,7 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa i.State.LastPreparedValue = i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData i.State.LastPreparedRound = i.State.Round - i.metrics.EndStagePrepare() + i.metrics.EndStagePrepare(ctx) logger.Debug("🎯 got prepare quorum", fields.Round(i.State.Round), diff --git a/protocol/v2/qbft/instance/proposal.go b/protocol/v2/qbft/instance/proposal.go index af31dc1560..ffb732b0b4 100644 --- a/protocol/v2/qbft/instance/proposal.go +++ b/protocol/v2/qbft/instance/proposal.go @@ -2,6 +2,7 @@ package instance import ( "bytes" + "context" "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" @@ -15,7 +16,7 @@ import ( // uponProposal process proposal message // Assumes proposal message is valid! -func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMessage, proposeMsgContainer *specqbft.MsgContainer) error { +func (i *Instance) uponProposal(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, proposeMsgContainer *specqbft.MsgContainer) error { addedMsg, err := proposeMsgContainer.AddFirstMsgForSignerAndRound(msg) if err != nil { return errors.Wrap(err, "could not add proposal msg to container") @@ -35,9 +36,9 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess if msg.QBFTMessage.Round > i.State.Round { i.config.GetTimer().TimeoutForRound(msg.QBFTMessage.Height, msg.QBFTMessage.Round) } - i.bumpToRound(newRound) + i.bumpToRound(ctx, newRound) - i.metrics.EndStageProposal() + i.metrics.EndStageProposal(ctx) // value root r, err := specqbft.HashDataRoot(msg.SignedMessage.FullData) diff --git a/protocol/v2/qbft/instance/round_change.go b/protocol/v2/qbft/instance/round_change.go index cfcc4044c5..1b1eb47d72 100644 --- a/protocol/v2/qbft/instance/round_change.go +++ b/protocol/v2/qbft/instance/round_change.go @@ -2,6 +2,7 @@ package instance import ( "bytes" + "context" "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" @@ -16,6 +17,7 @@ import ( // uponRoundChange process round change messages. // Assumes round change message is valid! func (i *Instance) uponRoundChange( + ctx context.Context, logger *zap.Logger, instanceStartValue []byte, msg *specqbft.ProcessingMessage, @@ -95,7 +97,7 @@ func (i *Instance) uponRoundChange( if newRound <= i.State.Round { return nil // no need to advance round } - err := i.uponChangeRoundPartialQuorum(logger, newRound, instanceStartValue) + err := i.uponChangeRoundPartialQuorum(ctx, logger, newRound, instanceStartValue) if err != nil { return err } @@ -103,8 +105,8 @@ func (i *Instance) uponRoundChange( return nil } -func (i *Instance) uponChangeRoundPartialQuorum(logger *zap.Logger, newRound specqbft.Round, instanceStartValue []byte) error { - i.bumpToRound(newRound) +func (i *Instance) uponChangeRoundPartialQuorum(ctx context.Context, logger *zap.Logger, newRound specqbft.Round, instanceStartValue []byte) error { + i.bumpToRound(ctx, newRound) i.State.ProposalAcceptedForCurrentRound = nil i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round) diff --git a/protocol/v2/qbft/instance/timeout.go b/protocol/v2/qbft/instance/timeout.go index f373e0cbd4..0d6bae8974 100644 --- a/protocol/v2/qbft/instance/timeout.go +++ b/protocol/v2/qbft/instance/timeout.go @@ -1,6 +1,8 @@ package instance import ( + "context" + "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" "go.uber.org/zap" @@ -8,7 +10,7 @@ import ( "github.com/ssvlabs/ssv/logging/fields" ) -func (i *Instance) UponRoundTimeout(logger *zap.Logger) error { +func (i *Instance) UponRoundTimeout(ctx context.Context, logger *zap.Logger) error { if !i.CanProcessMessages() { return errors.New("instance stopped processing timeouts") } @@ -20,7 +22,7 @@ func (i *Instance) UponRoundTimeout(logger *zap.Logger) error { // round to be bumped before the round change message was created & broadcasted. // Remember to track the impact of this change and revert/modify if necessary. defer func() { - i.bumpToRound(newRound) + i.bumpToRound(ctx, newRound) i.State.ProposalAcceptedForCurrentRound = nil i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round) }() diff --git a/protocol/v2/qbft/spectest/controller_type.go b/protocol/v2/qbft/spectest/controller_type.go index b5a705a754..15cc59c9be 100644 --- a/protocol/v2/qbft/spectest/controller_type.go +++ b/protocol/v2/qbft/spectest/controller_type.go @@ -149,7 +149,7 @@ func testBroadcastedDecided( } func runInstanceWithData(t *testing.T, logger *zap.Logger, height specqbft.Height, contr *controller.Controller, runData *spectests.RunInstanceData) error { - err := contr.StartNewInstance(logger, height, runData.InputValue) + err := contr.StartNewInstance(context.TODO(), logger, height, runData.InputValue) var lastErr error if err != nil { lastErr = err diff --git a/protocol/v2/qbft/spectest/msg_processing_type.go b/protocol/v2/qbft/spectest/msg_processing_type.go index a2945f9397..a341870a89 100644 --- a/protocol/v2/qbft/spectest/msg_processing_type.go +++ b/protocol/v2/qbft/spectest/msg_processing_type.go @@ -1,6 +1,7 @@ package qbft import ( + "context" "encoding/hex" "fmt" "path/filepath" @@ -53,7 +54,7 @@ func RunMsgProcessing(t *testing.T, test *spectests.MsgProcessingSpecTest) { var lastErr error for _, msg := range test.InputMessages { - _, _, _, err := preInstance.ProcessMsg(logger, spectestingutils.ToProcessingMessage(msg)) + _, _, _, err := preInstance.ProcessMsg(context.TODO(), logger, spectestingutils.ToProcessingMessage(msg)) if err != nil { lastErr = err } diff --git a/protocol/v2/qbft/spectest/timeout_type.go b/protocol/v2/qbft/spectest/timeout_type.go index 386c2bee69..2741553363 100644 --- a/protocol/v2/qbft/spectest/timeout_type.go +++ b/protocol/v2/qbft/spectest/timeout_type.go @@ -1,6 +1,7 @@ package qbft import ( + "context" "encoding/hex" "fmt" "testing" @@ -26,7 +27,7 @@ type SpecTest struct { func RunTimeout(t *testing.T, test *SpecTest) { logger := logging.TestLogger(t) - err := test.Pre.UponRoundTimeout(logger) + err := test.Pre.UponRoundTimeout(context.TODO(), logger) if len(test.ExpectedError) != 0 { require.EqualError(t, err, test.ExpectedError) diff --git a/protocol/v2/ssv/runner/aggregator.go b/protocol/v2/ssv/runner/aggregator.go index fe4189d8de..e7dde78f5f 100644 --- a/protocol/v2/ssv/runner/aggregator.go +++ b/protocol/v2/ssv/runner/aggregator.go @@ -126,7 +126,7 @@ func (r *AggregatorRunner) ProcessPreConsensus(ctx context.Context, logger *zap. DataSSZ: byts, } - if err := r.BaseRunner.decide(logger, r, duty.Slot, input); err != nil { + if err := r.BaseRunner.decide(ctx, logger, r, duty.Slot, input); err != nil { return errors.Wrap(err, "can't start new duty runner instance for duty") } diff --git a/protocol/v2/ssv/runner/committee.go b/protocol/v2/ssv/runner/committee.go index 9c4a58d793..fe87246ad8 100644 --- a/protocol/v2/ssv/runner/committee.go +++ b/protocol/v2/ssv/runner/committee.go @@ -694,7 +694,7 @@ func (cr *CommitteeRunner) executeDuty(ctx context.Context, logger *zap.Logger, Target: attData.Target, } - if err := cr.BaseRunner.decide(logger, cr, duty.DutySlot(), vote); err != nil { + if err := cr.BaseRunner.decide(ctx, logger, cr, duty.DutySlot(), vote); err != nil { return errors.Wrap(err, "can't start new duty runner instance for duty") } return nil diff --git a/protocol/v2/ssv/runner/proposer.go b/protocol/v2/ssv/runner/proposer.go index 90f379e098..4b8c8e71cd 100644 --- a/protocol/v2/ssv/runner/proposer.go +++ b/protocol/v2/ssv/runner/proposer.go @@ -149,7 +149,7 @@ func (r *ProposerRunner) ProcessPreConsensus(ctx context.Context, logger *zap.Lo r.measurements.StartConsensus() - if err := r.BaseRunner.decide(logger, r, duty.Slot, input); err != nil { + if err := r.BaseRunner.decide(ctx, logger, r, duty.Slot, input); err != nil { return errors.Wrap(err, "can't start new duty runner instance for duty") } diff --git a/protocol/v2/ssv/runner/runner.go b/protocol/v2/ssv/runner/runner.go index 037658a1a7..306010ff39 100644 --- a/protocol/v2/ssv/runner/runner.go +++ b/protocol/v2/ssv/runner/runner.go @@ -291,7 +291,7 @@ func (b *BaseRunner) didDecideCorrectly(prevDecided bool, signedMessage *spectyp return true, nil } -func (b *BaseRunner) decide(logger *zap.Logger, runner Runner, slot phase0.Slot, input spectypes.Encoder) error { +func (b *BaseRunner) decide(ctx context.Context, logger *zap.Logger, runner Runner, slot phase0.Slot, input spectypes.Encoder) error { byts, err := input.Encode() if err != nil { return errors.Wrap(err, "could not encode input data for consensus") @@ -301,7 +301,9 @@ func (b *BaseRunner) decide(logger *zap.Logger, runner Runner, slot phase0.Slot, return errors.Wrap(err, "input data invalid") } - if err := runner.GetBaseRunner().QBFTController.StartNewInstance(logger, + if err := runner.GetBaseRunner().QBFTController.StartNewInstance( + ctx, + logger, specqbft.Height(slot), byts, ); err != nil { diff --git a/protocol/v2/ssv/runner/sync_committee_aggregator.go b/protocol/v2/ssv/runner/sync_committee_aggregator.go index 2564a5c0da..50c162b7ee 100644 --- a/protocol/v2/ssv/runner/sync_committee_aggregator.go +++ b/protocol/v2/ssv/runner/sync_committee_aggregator.go @@ -156,7 +156,7 @@ func (r *SyncCommitteeAggregatorRunner) ProcessPreConsensus(ctx context.Context, } r.measurements.StartConsensus() - if err := r.BaseRunner.decide(logger, r, input.Duty.Slot, input); err != nil { + if err := r.BaseRunner.decide(ctx, logger, r, input.Duty.Slot, input); err != nil { return errors.Wrap(err, "can't start new duty runner instance for duty") } return nil diff --git a/protocol/v2/ssv/validator/events.go b/protocol/v2/ssv/validator/events.go index 6b5f3a81b5..b97594dfb3 100644 --- a/protocol/v2/ssv/validator/events.go +++ b/protocol/v2/ssv/validator/events.go @@ -19,7 +19,7 @@ func (v *Validator) handleEventMessage(ctx context.Context, logger *zap.Logger, } switch eventMsg.Type { case types.Timeout: - if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(logger, *eventMsg); err != nil { + if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(ctx, logger, *eventMsg); err != nil { return fmt.Errorf("timeout event: %w", err) } return nil @@ -53,7 +53,7 @@ func (c *Committee) handleEventMessage(ctx context.Context, logger *zap.Logger, return nil } - if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(logger, *eventMsg); err != nil { + if err := dutyRunner.GetBaseRunner().QBFTController.OnTimeout(ctx, logger, *eventMsg); err != nil { return fmt.Errorf("timeout event: %w", err) } return nil