Skip to content

Commit

Permalink
Observability - instrument QBFT component, ctx propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Dec 3, 2024
1 parent da9df9b commit 1c6fb59
Show file tree
Hide file tree
Showing 20 changed files with 139 additions and 90 deletions.
10 changes: 5 additions & 5 deletions protocol/v2/qbft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewController(
}

// StartNewInstance will start a new QBFT instance, if can't will return error
func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height, value []byte) error {
func (c *Controller) StartNewInstance(ctx context.Context, logger *zap.Logger, height specqbft.Height, value []byte) error {

if err := c.GetConfig().GetValueCheckF()(value); err != nil {
return errors.Wrap(err, "value invalid")
Expand All @@ -71,7 +71,7 @@ func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height
c.Height = height

newInstance := c.addAndStoreNewInstance()
newInstance.Start(logger, value, height)
newInstance.Start(ctx, logger, value, height)
c.forceStopAllInstanceExceptCurrent()
return nil
}
Expand Down Expand Up @@ -118,10 +118,10 @@ func (c *Controller) ProcessMsg(ctx context.Context, logger *zap.Logger, signedM
return nil, fmt.Errorf("future msg from height, could not process")
}

return c.UponExistingInstanceMsg(logger, msg)
return c.UponExistingInstanceMsg(ctx, logger, msg)
}

func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.ProcessingMessage) (*spectypes.SignedSSVMessage, error) {
func (c *Controller) UponExistingInstanceMsg(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage) (*spectypes.SignedSSVMessage, error) {
inst := c.StoredInstances.FindInstance(msg.QBFTMessage.Height)
if inst == nil {
return nil, errors.New("instance not found")
Expand All @@ -134,7 +134,7 @@ func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.P
return nil, errors.New("not processing consensus message since instance is already decided")
}

decided, _, decidedMsg, err := inst.ProcessMsg(logger, msg)
decided, _, decidedMsg, err := inst.ProcessMsg(ctx, logger, msg)
if err != nil {
return nil, errors.Wrap(err, "could not process msg")
}
Expand Down
5 changes: 3 additions & 2 deletions protocol/v2/qbft/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"encoding/json"
"testing"

Expand Down Expand Up @@ -82,7 +83,7 @@ func TestController_OnTimeoutWithRoundCheck(t *testing.T) {
contr.StoredInstances.addNewInstance(inst)

// Call OnTimeout and capture the error
err = contr.OnTimeout(logger, *msg)
err = contr.OnTimeout(context.TODO(), logger, *msg)

// Assert that the error is nil and the round did not bump
require.NoError(t, err)
Expand All @@ -92,7 +93,7 @@ func TestController_OnTimeoutWithRoundCheck(t *testing.T) {
inst.State.Round = specqbft.FirstRound

// Call OnTimeout and capture the error
err = contr.OnTimeout(logger, *msg)
err = contr.OnTimeout(context.TODO(), logger, *msg)

// Assert that the error is nil and the round did bump
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions protocol/v2/qbft/controller/timer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package controller

import (
"context"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/protocol/v2/types"
)

// OnTimeout is trigger upon timeout for the given height
func (c *Controller) OnTimeout(logger *zap.Logger, msg types.EventMsg) error {
func (c *Controller) OnTimeout(ctx context.Context, logger *zap.Logger, msg types.EventMsg) error {
// TODO add validation

timeoutData, err := msg.GetTimeoutData()
Expand All @@ -28,5 +30,5 @@ func (c *Controller) OnTimeout(logger *zap.Logger, msg types.EventMsg) error {
if decided, _ := instance.IsDecided(); decided {
return nil
}
return instance.UponRoundTimeout(logger)
return instance.UponRoundTimeout(ctx, logger)
}
5 changes: 3 additions & 2 deletions protocol/v2/qbft/instance/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instance

import (
"bytes"
"context"
"sort"

"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ import (

// UponCommit returns true if a quorum of commit messages was received.
// Assumes commit message is valid!
func (i *Instance) UponCommit(logger *zap.Logger, msg *specqbft.ProcessingMessage, commitMsgContainer *specqbft.MsgContainer) (bool, []byte, *spectypes.SignedSSVMessage, error) {
func (i *Instance) UponCommit(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, commitMsgContainer *specqbft.MsgContainer) (bool, []byte, *spectypes.SignedSSVMessage, error) {
logger.Debug("📬 got commit message",
fields.Round(i.State.Round),
zap.Any("commit_signers", msg.SignedMessage.OperatorIDs),
Expand Down Expand Up @@ -49,7 +50,7 @@ func (i *Instance) UponCommit(logger *zap.Logger, msg *specqbft.ProcessingMessag
zap.Any("agg_signers", agg.OperatorIDs),
fields.Root(msg.QBFTMessage.Root))

i.metrics.EndStageCommit()
i.metrics.EndStageCommit(ctx)

return true, fullData, agg, nil
}
Expand Down
19 changes: 10 additions & 9 deletions protocol/v2/qbft/instance/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package instance

import (
"context"
"encoding/base64"
"encoding/json"
"sync"
Expand Down Expand Up @@ -69,10 +70,10 @@ func (i *Instance) ForceStop() {
}

// Start is an interface implementation
func (i *Instance) Start(logger *zap.Logger, value []byte, height specqbft.Height) {
func (i *Instance) Start(ctx context.Context, logger *zap.Logger, value []byte, height specqbft.Height) {
i.startOnce.Do(func() {
i.StartValue = value
i.bumpToRound(specqbft.FirstRound)
i.bumpToRound(ctx, specqbft.FirstRound)
i.State.Height = height
i.metrics.StartStage()
i.config.GetTimer().TimeoutForRound(height, specqbft.FirstRound)
Expand Down Expand Up @@ -126,7 +127,7 @@ func allSigners(all []*specqbft.ProcessingMessage) []spectypes.OperatorID {
}

// ProcessMsg processes a new QBFT msg, returns non nil error on msg processing error
func (i *Instance) ProcessMsg(logger *zap.Logger, msg *specqbft.ProcessingMessage) (decided bool, decidedValue []byte, aggregatedCommit *spectypes.SignedSSVMessage, err error) {
func (i *Instance) ProcessMsg(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage) (decided bool, decidedValue []byte, aggregatedCommit *spectypes.SignedSSVMessage, err error) {
if !i.CanProcessMessages() {
return false, nil, nil, errors.New("instance stopped processing messages")
}
Expand All @@ -139,18 +140,18 @@ func (i *Instance) ProcessMsg(logger *zap.Logger, msg *specqbft.ProcessingMessag

switch msg.QBFTMessage.MsgType {
case specqbft.ProposalMsgType:
return i.uponProposal(logger, msg, i.State.ProposeContainer)
return i.uponProposal(ctx, logger, msg, i.State.ProposeContainer)
case specqbft.PrepareMsgType:
return i.uponPrepare(logger, msg, i.State.PrepareContainer)
return i.uponPrepare(ctx, logger, msg, i.State.PrepareContainer)
case specqbft.CommitMsgType:
decided, decidedValue, aggregatedCommit, err = i.UponCommit(logger, msg, i.State.CommitContainer)
decided, decidedValue, aggregatedCommit, err = i.UponCommit(ctx, logger, msg, i.State.CommitContainer)
if decided {
i.State.Decided = decided
i.State.DecidedValue = decidedValue
}
return err
case specqbft.RoundChangeMsgType:
return i.uponRoundChange(logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF())
return i.uponRoundChange(ctx, logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF())
default:
return errors.New("signed message type not supported")
}
Expand Down Expand Up @@ -249,9 +250,9 @@ func (i *Instance) Decode(data []byte) error {
}

// bumpToRound sets round and sends current round metrics.
func (i *Instance) bumpToRound(round specqbft.Round) {
func (i *Instance) bumpToRound(ctx context.Context, round specqbft.Round) {
i.State.Round = round
i.metrics.SetRound(round)
i.metrics.SetRound(ctx, round)
}

// CanProcessMessages will return true if instance can process messages
Expand Down
79 changes: 30 additions & 49 deletions protocol/v2/qbft/instance/metrics.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,56 @@
package instance

import (
"context"
"math"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
"go.uber.org/zap"
"go.opentelemetry.io/otel/metric"
)

var (
metricsStageDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "ssv_validator_instance_stage_duration_seconds",
Help: "Instance stage duration (seconds)",
Buckets: []float64{0.02, 0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 5},
}, []string{"stage"})
metricsRound = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ssv_qbft_instance_round",
Help: "QBFT instance round",
}, []string{"roleType"})
)

func init() {
allMetrics := []prometheus.Collector{
metricsStageDuration,
metricsRound,
}
logger := zap.L()
for _, c := range allMetrics {
if err := prometheus.Register(c); err != nil {
logger.Debug("could not register prometheus collector")
}
}
}

type metrics struct {
StageStart time.Time
proposalDuration prometheus.Observer
prepareDuration prometheus.Observer
commitDuration prometheus.Observer
round prometheus.Gauge
stageStart time.Time
role string
}

func newMetrics(role string) *metrics {
return &metrics{
proposalDuration: metricsStageDuration.WithLabelValues("proposal"),
prepareDuration: metricsStageDuration.WithLabelValues("prepare"),
commitDuration: metricsStageDuration.WithLabelValues("commit"),
round: metricsRound.WithLabelValues(role),
role: role,
}
}

func (m *metrics) StartStage() {
m.StageStart = time.Now()
m.stageStart = time.Now()
}

func (m *metrics) EndStageProposal() {
m.proposalDuration.Observe(time.Since(m.StageStart).Seconds())
m.StageStart = time.Now()
func (m *metrics) EndStageProposal(ctx context.Context) {
validatorStageDurationHistogram.Record(
ctx,
time.Since(m.stageStart).Seconds(),
metric.WithAttributes(stageAttribute(proposalStage)))
m.stageStart = time.Now()
}

func (m *metrics) EndStagePrepare() {
m.prepareDuration.Observe(time.Since(m.StageStart).Seconds())
m.StageStart = time.Now()
func (m *metrics) EndStagePrepare(ctx context.Context) {
validatorStageDurationHistogram.Record(
ctx,
time.Since(m.stageStart).Seconds(),
metric.WithAttributes(stageAttribute(prepareStage)))
m.stageStart = time.Now()
}

func (m *metrics) EndStageCommit() {
m.commitDuration.Observe(time.Since(m.StageStart).Seconds())
m.StageStart = time.Now()
func (m *metrics) EndStageCommit(ctx context.Context) {
validatorStageDurationHistogram.Record(
ctx,
time.Since(m.stageStart).Seconds(),
metric.WithAttributes(stageAttribute(commitStage)))
m.stageStart = time.Now()
}

func (m *metrics) SetRound(round specqbft.Round) {
m.round.Set(float64(round))
func (m *metrics) SetRound(ctx context.Context, round specqbft.Round) {
convertedRound := uint64(round)
if convertedRound <= math.MaxInt64 {
roundGauge.Record(ctx, int64(convertedRound), metric.WithAttributes(roleAttribute(m.role)))
}
}
53 changes: 53 additions & 0 deletions protocol/v2/qbft/instance/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package instance

import (
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/ssvlabs/ssv/observability"
)

const (
observabilityName = "github.com/ssvlabs/ssv/protocol/v2/qbft"
observabilityNamespace = "ssv.validator"
)

type stage string

const (
proposalStage stage = "proposal"
prepareStage stage = "prepare"
commitStage stage = "commit"
)

var (
meter = otel.Meter(observabilityName)

validatorStageDurationHistogram = observability.NewMetric(
meter.Float64Histogram(
metricName("stage.duration"),
metric.WithUnit("s"),
metric.WithDescription("validator stage(proposal, prepare, commit) duration"),
metric.WithExplicitBucketBoundaries(observability.SecondsHistogramBuckets...)))

roundGauge = observability.NewMetric(
meter.Int64Gauge(
metricName("round"),
metric.WithUnit("{round}"),
metric.WithDescription("QBFT instance round")))
)

func metricName(name string) string {
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func stageAttribute(stage stage) attribute.KeyValue {
return attribute.String("ssv.validator.stage", string(stage))
}

func roleAttribute(role string) attribute.KeyValue {
return attribute.String("ssv.runner.role", role)
}
5 changes: 3 additions & 2 deletions protocol/v2/qbft/instance/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instance

import (
"bytes"
"context"

"github.com/pkg/errors"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
Expand All @@ -15,7 +16,7 @@ import (

// uponPrepare process prepare message
// Assumes prepare message is valid!
func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessage, prepareMsgContainer *specqbft.MsgContainer) error {
func (i *Instance) uponPrepare(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, prepareMsgContainer *specqbft.MsgContainer) error {
hasQuorumBefore := specqbft.HasQuorum(i.State.CommitteeMember, prepareMsgContainer.MessagesForRound(i.State.Round))

addedMsg, err := prepareMsgContainer.AddFirstMsgForSignerAndRound(msg)
Expand Down Expand Up @@ -43,7 +44,7 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa
i.State.LastPreparedValue = i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData
i.State.LastPreparedRound = i.State.Round

i.metrics.EndStagePrepare()
i.metrics.EndStagePrepare(ctx)

logger.Debug("🎯 got prepare quorum",
fields.Round(i.State.Round),
Expand Down
7 changes: 4 additions & 3 deletions protocol/v2/qbft/instance/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instance

import (
"bytes"
"context"

"github.com/pkg/errors"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
Expand All @@ -15,7 +16,7 @@ import (

// uponProposal process proposal message
// Assumes proposal message is valid!
func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMessage, proposeMsgContainer *specqbft.MsgContainer) error {
func (i *Instance) uponProposal(ctx context.Context, logger *zap.Logger, msg *specqbft.ProcessingMessage, proposeMsgContainer *specqbft.MsgContainer) error {
addedMsg, err := proposeMsgContainer.AddFirstMsgForSignerAndRound(msg)
if err != nil {
return errors.Wrap(err, "could not add proposal msg to container")
Expand All @@ -35,9 +36,9 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess
if msg.QBFTMessage.Round > i.State.Round {
i.config.GetTimer().TimeoutForRound(msg.QBFTMessage.Height, msg.QBFTMessage.Round)
}
i.bumpToRound(newRound)
i.bumpToRound(ctx, newRound)

i.metrics.EndStageProposal()
i.metrics.EndStageProposal(ctx)

// value root
r, err := specqbft.HashDataRoot(msg.SignedMessage.FullData)
Expand Down
Loading

0 comments on commit 1c6fb59

Please sign in to comment.