Skip to content

Commit

Permalink
Add initial metrics to Churn server (#11)
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Barbosa <[email protected]>
Co-authored-by: Wellington Barbosa <[email protected]>
  • Loading branch information
wmb-software-consulting and Wellington Barbosa authored Nov 14, 2023
1 parent 54a9807 commit 90f6169
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 12 deletions.
5 changes: 5 additions & 0 deletions churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ type churner struct {

privateKey *ecdsa.PrivateKey
logger common.Logger
metrics *Metrics
}

func NewChurner(
config *Config,
indexer thegraph.IndexedChainState,
transactor core.Transactor,
logger common.Logger,
metrics *Metrics,
) (*churner, error) {
privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString)
if err != nil {
Expand All @@ -70,6 +72,7 @@ func NewChurner(

privateKey: privateKey,
logger: logger,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -207,12 +210,14 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// verify the lowest stake against the registering operator's stake
// make sure that: lowestStake * churnBIPsOfOperatorStake < operatorToRegisterStake * bipMultiplier
if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister)
return nil, errors.New("registering operator has less than churnBIPsOfOperatorStake")
}

// verify the lowest stake against the total stake
// make sure that: lowestStake * bipMultiplier < totalStake * churnBIPsOfTotalStake
if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn)
return nil, errors.New("operator to churn has less than churnBIPSOfTotalStake")
}

Expand Down
3 changes: 2 additions & 1 deletion churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestProcessChurnRequest(t *testing.T) {
PrivateKeyString: churnerPrivateKeyHex,
},
}
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
assert.NoError(t, err)
assert.NotNil(t, cn)

Expand Down
8 changes: 4 additions & 4 deletions churner/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -81,14 +80,15 @@ func run(ctx *cli.Context) error {

querier := graphql.NewClient(config.GraphUrl, nil)
indexer := thegraph.NewIndexedChainState(cs, querier, logger)
metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)

cn, err := churner.NewChurner(config, indexer, tx, logger)
cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
}

churnerServer := churner.NewServer(config, cn, logger)
if err = churnerServer.Start(context.Background()); err != nil {
churnerServer := churner.NewServer(config, cn, logger, metrics)
if err = churnerServer.Start(config.MetricsConfig); err != nil {
log.Fatalln("failed to start churner server", err)
}

Expand Down
5 changes: 5 additions & 0 deletions churner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
EthClientConfig geth.EthClientConfig
LoggerConfig logging.Config
GraphUrl string
MetricsConfig MetricsConfig

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand All @@ -28,5 +29,9 @@ func NewConfig(ctx *cli.Context) *Config {
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name),
MetricsConfig: MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
}
}
16 changes: 16 additions & 0 deletions churner/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "PER_PUBLIC_KEY_RATE_LIMIT"),
Value: 24 * time.Hour,
}
EnableMetrics = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-metrics"),
Usage: "start metrics server",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"),
}
/* Optional Flags*/
MetricsHTTPPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Required: false,
Value: "9100",
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -65,10 +79,12 @@ var requiredFlags = []cli.Flag{
GraphUrlFlag,
BlsOperatorStateRetrieverFlag,
EigenDAServiceManagerFlag,
EnableMetrics,
}

var optionalFlags = []cli.Flag{
PerPublicKeyRateLimit,
MetricsHTTPPort,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
110 changes: 110 additions & 0 deletions churner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package churner

import (
"context"
"fmt"
"net/http"

"github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type FailReason string

const (
FailReasonRateLimitExceeded FailReason = "rate_limit_exceeded" // Rate limited: per operator rate limiting
FailReasonInsufficientStakeToRegister FailReason = "insufficient_stake_to_register" // Operator doesn't have enough stake to be registered
FailReasonInsufficientStakeToChurn FailReason = "insufficient_stake_to_churn" // Operator doesn't have enough stake to be churned
FailReasonQuorumIdOutOfRange FailReason = "quorum_id_out_of_range" // Quorum ID out of range: quorum is not in the range of [0, QuorumCount]
FailReasonPrevApprovalNotExpired FailReason = "prev_approval_not_expired" // Expiry: previous approval hasn't expired
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wong
FailReasonProcessChurnRequestFailed FailReason = "failed_process_churn_request" // Failed to process churn request
)

type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
}

type Metrics struct {
registry *prometheus.Registry

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

httpPort string
logger common.Logger
}

func NewMetrics(httpPort string, logger common.Logger) *Metrics {
namespace := "eigenda_churner"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "requests",
Help: "the number of requests",
},
[]string{"status", "reason", "method"},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "latency_ms",
Help: "latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"method"},
),
registry: reg,
httpPort: httpPort,
logger: logger,
}
return metrics
}

// ObserveLatency observes the latency of a stage in 'stage
func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "success",
"method": method,
"reason": "",
}).Inc()
}

// IncrementFailedRequestNum increments the number of failed requests
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
g.NumRequests.With(prometheus.Labels{
"status": "failed",
"reason": string(reason),
"method": method,
}).Inc()
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)
go func() {
log := g.logger
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
g.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
}
25 changes: 22 additions & 3 deletions churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/prometheus/client_golang/prometheus"
)

type Server struct {
Expand All @@ -19,34 +20,47 @@ type Server struct {
latestExpiry int64
lastRequestTimeByOperatorID map[core.OperatorID]time.Time

logger common.Logger
logger common.Logger
metrics *Metrics
}

func NewServer(
config *Config,
churner *churner,
logger common.Logger,
metrics *Metrics,
) *Server {
return &Server{
config: config,
churner: churner,
latestExpiry: int64(0),
lastRequestTimeByOperatorID: make(map[core.OperatorID]time.Time),
logger: logger,
metrics: metrics,
}
}

func (s *Server) Start(ctx context.Context) error {
// TODO: Start Metrics
func (s *Server) Start(metricsConfig MetricsConfig) error {
// Enable Metrics Block
if metricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort)
s.metrics.Start(context.Background())
s.logger.Info("Enabled metrics for Churner", "socket", httpSocket)
}
return nil
}

func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())

now := time.Now()
// check that we are after the previous approval's expiry
if now.Unix() < s.latestExpiry {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired)
return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix())
}

Expand All @@ -58,6 +72,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
}

if quorumID >= int(s.churner.QuorumCount) {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonQuorumIdOutOfRange)
return nil, fmt.Errorf("Invalid request: the quorum_id must be in range [0, %d], but found %d", s.churner.QuorumCount-1, quorumID)
}
}
Expand All @@ -67,17 +82,20 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature)
return nil, fmt.Errorf("failed to verify request signature: %w", err)
}

// check if the request should be rate limited
err = s.checkShouldBeRateLimited(now, *request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded)
return nil, fmt.Errorf("rate limiter error: %w", err)
}

response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
return nil, fmt.Errorf("failed to process churn request: %w", err)
}

Expand All @@ -86,6 +104,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

operatorsToChurn := convertToOperatorsToChurnGrpc(response.OperatorsToChurn)

s.metrics.IncrementSuccessfulRequestNum("Churn")
return &pb.ChurnReply{
SignatureWithSaltAndExpiry: &pb.SignatureWithSaltAndExpiry{
Signature: response.SignatureWithSaltAndExpiry.Signature,
Expand Down
5 changes: 3 additions & 2 deletions churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,13 @@ func newTestServer(t *testing.T) *churner.Server {

setupMockTransactor()

cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
}

return churner.NewServer(config, cn, logger)
return churner.NewServer(config, cn, logger, metrics)
}

func makeOperatorId(id int) dacore.OperatorID {
Expand Down
5 changes: 3 additions & 2 deletions churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func newTestServer(t *testing.T) *churner.Server {
)
assert.NoError(t, err)

cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics)
assert.NoError(t, err)

return churner.NewServer(config, cn, logger)
return churner.NewServer(config, cn, logger, metrics)
}
3 changes: 3 additions & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri
CHURNER_STD_LOG_LEVEL: "debug",
CHURNER_FILE_LOG_LEVEL: "trace",
CHURNER_LOG_PATH: logPath,

CHURNER_ENABLE_METRICS: "true",
CHURNER_METRICS_HTTP_PORT: "9095",
}

env.applyDefaults(&v, "CHURNER", "churner", ind)
Expand Down
4 changes: 4 additions & 0 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ type ChurnerVars struct {
CHURNER_LOG_PATH string

CHURNER_INDEXER_PULL_INTERVAL string

CHURNER_ENABLE_METRICS string

CHURNER_METRICS_HTTP_PORT string
}

func (vars ChurnerVars) getEnvMap() map[string]string {
Expand Down

0 comments on commit 90f6169

Please sign in to comment.