Skip to content

Commit

Permalink
Move churner to new metrics framework.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 25, 2024
1 parent b28e075 commit dc82a66
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 83 deletions.
2 changes: 2 additions & 0 deletions common/metrics/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (m *metrics) Start() error {
}
m.started = true

m.logger.Infof("Starting metrics server at port %d", m.config.HTTPPort)

go func() {
err := m.server.ListenAndServe()
if err != nil && !strings.Contains(err.Error(), "http: Server closed") {
Expand Down
10 changes: 8 additions & 2 deletions operators/churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// For example, when churnBIPsOfOperatorStake=11000, the operator trying to
// register needs to have 1.1 times the stake of the lowest-stake operator.
if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister)
e := c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister)
if e != nil {
c.logger.Error("Failed to increment failed request num", "error", e)
}
msg := "registering operator must have %f%% more than the stake of the " +
"lowest-stake operator. Block number used for this decision: %d, " +
"registering operator address: %s, registering operator stake: %d, " +
Expand All @@ -249,7 +252,10 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// (i.e. the lowest-stake operator) needs to have less than 10.01% of the total
// stake.
if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn)
e := c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn)
if e != nil {
c.logger.Error("Failed to increment failed request num", "error", e)
}
msg := "operator to churn out must have less than %f%% of the total stake. " +
"Block number used for this decision: %d, operatorId of the operator " +
"to churn: %x, stake of the operator to churn: %d, total stake in " +
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestProcessChurnRequest(t *testing.T) {
NumRetries: numRetries,
},
}
metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
assert.NoError(t, err)
assert.NotNil(t, cn)
Expand Down
5 changes: 4 additions & 1 deletion operators/churner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func run(ctx *cli.Context) error {
logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)

metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)
metrics, err := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)
if err != nil {
log.Fatalf("failed to create metrics: %v", err)
}

cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion operators/churner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name),
ChurnApprovalInterval: ctx.GlobalDuration(flags.ChurnApprovalInterval.Name),
MetricsConfig: MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
HTTPPort: ctx.GlobalInt(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
}, nil
Expand Down
4 changes: 2 additions & 2 deletions operators/churner/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"),
}
/* Optional Flags*/
MetricsHTTPPort = cli.StringFlag{
MetricsHTTPPort = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Required: false,
Value: "9100",
Value: 9100,
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
ChurnApprovalInterval = cli.DurationFlag{
Expand Down
123 changes: 59 additions & 64 deletions operators/churner/metrics.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package churner

import (
"context"
"fmt"
"net/http"
"github.com/Layr-Labs/eigenda/common/metrics"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

Expand All @@ -28,7 +25,7 @@ const (
)

// Note: statusCodeMap must be maintained in sync with failure reason constants.
var statusCodeMap map[FailReason]string = map[FailReason]string{
var statusCodeMap = map[FailReason]string{
FailReasonRateLimitExceeded: codes.ResourceExhausted.String(),
FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(),
FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(),
Expand All @@ -40,93 +37,91 @@ var statusCodeMap map[FailReason]string = map[FailReason]string{
}

type MetricsConfig struct {
HTTPPort string
HTTPPort int
EnableMetrics bool
}

type Metrics struct {
registry *prometheus.Registry
metricsServer metrics.Metrics

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec
numRequests metrics.CountMetric
latency metrics.LatencyMetric

httpPort string
logger logging.Logger
logger logging.Logger
}

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
namespace := "eigenda_churner"
type latencyLabel struct {
method string
}

type numRequestsLabel struct {
status string
method string
reason string
}

func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "requests",
Help: "the number of requests",
},
[]string{"status", "reason", "method"},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "latency_ms",
Help: "latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"method"},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "ChurnerMetrics"),
metricsServer := metrics.NewMetrics(logger, &metrics.Config{
Namespace: "eigenda_churner",
HTTPPort: httpPort,
})

numRequests, err := metricsServer.NewCountMetric(
"requests",
"the number of requests",
numRequestsLabel{})
if err != nil {
return nil, err
}
return metrics

latency, err := metricsServer.NewLatencyMetric(
"latency",
"latency summary in milliseconds",
latencyLabel{},
&metrics.Quantile{Quantile: 0.5, Error: 0.05},
&metrics.Quantile{Quantile: 0.9, Error: 0.01},
&metrics.Quantile{Quantile: 0.95, Error: 0.01},
&metrics.Quantile{Quantile: 0.99, Error: 0.001})
if err != nil {
return nil, err
}

return &Metrics{
metricsServer: metricsServer,
numRequests: numRequests,
latency: latency,
logger: logger.With("component", "ChurnerMetrics"),
}, nil
}

// ObserveLatency observes the latency of a stage in 'stage
func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
// ObserveLatency observes the latency of a stage
func (g *Metrics) ObserveLatency(method string, latency time.Duration) error {
return g.latency.ReportLatency(latency, latencyLabel{method: method})
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "success",
"method": method,
"reason": "",
}).Inc()
func (g *Metrics) IncrementSuccessfulRequestNum(method string) error {
return g.numRequests.Increment(numRequestsLabel{status: "success", method: method})
}

// IncrementFailedRequestNum increments the number of failed requests
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) error {
code, ok := statusCodeMap[reason]
if !ok {
g.logger.Error("cannot map failure reason to status code", "failure reason", reason)
// Treat this as an internal server error. This is a conservative approach to
// handle a negligence of mapping from failure reason to status code.
code = codes.Internal.String()
}
g.NumRequests.With(prometheus.Labels{
"status": code,
"reason": string(reason),
"method": method,
}).Inc()

return g.numRequests.Increment(numRequestsLabel{status: code, reason: string(reason), method: method})
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)
go func() {
log := g.logger
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
g.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
func (g *Metrics) Start() error {
return g.metricsServer.Start()
}
48 changes: 38 additions & 10 deletions operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ func NewServer(
func (s *Server) Start(metricsConfig MetricsConfig) error {
// Enable Metrics Block
if metricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort)
s.metrics.Start(context.Background())
httpSocket := fmt.Sprintf(":%d", metricsConfig.HTTPPort)
err := s.metrics.Start()
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}

s.logger.Info("Enabled metrics for Churner", "socket", httpSocket)
}
return nil
Expand All @@ -57,39 +61,57 @@ func (s *Server) Start(metricsConfig MetricsConfig) error {
func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) {
err := s.validateChurnRequest(ctx, req)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid request: %s", err.Error()))
}

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds
e := s.metrics.ObserveLatency("Churn", time.Duration(f*float64(time.Second)))
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
}))
defer timer.ObserveDuration()
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())

now := time.Now()
// Global rate limiting: check that we are after the previous approval's expiry
if now.Unix() < s.latestExpiry {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("previous approval not expired, retry in %d seconds", s.latestExpiry-now.Unix()))
}

request, err := createChurnRequest(req)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorInvalidArg(err.Error())
}

operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to verify request signature: %s", err.Error()))
}

// Per-operator rate limiting: check if the request should be rate limited
err = s.checkShouldBeRateLimited(now, *request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("rate limiter error: %s", err.Error()))
}

Expand All @@ -98,7 +120,10 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
if _, ok := status.FromError(err); ok {
return nil, err
}
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
e := s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return nil, api.NewErrorInternal(fmt.Sprintf("failed to process churn request: %s", err.Error()))
}

Expand All @@ -107,7 +132,10 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

operatorsToChurn := convertToOperatorsToChurnGrpc(response.OperatorsToChurn)

s.metrics.IncrementSuccessfulRequestNum("Churn")
e := s.metrics.IncrementSuccessfulRequestNum("Churn")
if e != nil {
s.logger.Error("Failed to increment failed request num", "error", e)
}
return &pb.ChurnReply{
SignatureWithSaltAndExpiry: &pb.SignatureWithSaltAndExpiry{
Signature: response.SignatureWithSaltAndExpiry.Signature,
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ func newTestServer(t *testing.T) *churner.Server {

setupMockWriter()

metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func newTestServer(t *testing.T) *churner.Server {
)
assert.NoError(t, err)

metrics := churner.NewMetrics("9001", logger)
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics)
assert.NoError(t, err)

Expand Down

0 comments on commit dc82a66

Please sign in to comment.