diff --git a/metrics.md b/metrics.md new file mode 100644 index 000000000..72c61a314 --- /dev/null +++ b/metrics.md @@ -0,0 +1,4 @@ +# EigenDA Metrics Documentation + +- [churner](operators/churner/churner-metrics.md) + diff --git a/operators/churner/churner-metrics.md b/operators/churner/churner-metrics.md new file mode 100644 index 000000000..7a6b04180 --- /dev/null +++ b/operators/churner/churner-metrics.md @@ -0,0 +1,33 @@ +# Metrics Documentation for namespace 'eigenda_churner' + +This documentation was automatically generated at time `2024-11-26T14:29:13-06:00` + +There are a total of `2` registered metrics. + +--- + +## latency_ms + +latency summary in milliseconds + +| | | +|---|---| +| **Name** | `latency` | +| **Unit** | `ms` | +| **Labels** | `method` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.950`, `0.990` | +| **Fully Qualified Name** | `eigenda_churner_latency_ms` | +--- + +## request_count + +the number of requests + +| | | +|---|---| +| **Name** | `request` | +| **Unit** | `count` | +| **Labels** | `status`, `method`, `reason` | +| **Type** | `counter` | +| **Fully Qualified Name** | `eigenda_churner_request_count` | diff --git a/operators/churner/churner_test.go b/operators/churner/churner_test.go index 0854de749..35c026977 100644 --- a/operators/churner/churner_test.go +++ b/operators/churner/churner_test.go @@ -28,7 +28,8 @@ func TestProcessChurnRequest(t *testing.T) { NumRetries: numRetries, }, } - metrics := churner.NewMetrics("9001", logger) + metrics, err := churner.NewMetrics(9001, logger) + assert.NoError(t, err) cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) assert.NoError(t, err) assert.NotNil(t, cn) diff --git a/operators/churner/cmd/main.go b/operators/churner/cmd/main.go index a9ecd76e8..33a8c422d 100644 --- a/operators/churner/cmd/main.go +++ b/operators/churner/cmd/main.go @@ -86,7 +86,10 @@ func run(ctx *cli.Context) error { logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) + metrics, err := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) + if err != nil { + log.Fatalf("failed to create metrics: %v", err) + } cn, err := churner.NewChurner(config, indexer, tx, logger, metrics) if err != nil { diff --git a/operators/churner/config.go b/operators/churner/config.go index d35a72121..fe2b1735a 100644 --- a/operators/churner/config.go +++ b/operators/churner/config.go @@ -37,7 +37,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name), ChurnApprovalInterval: ctx.GlobalDuration(flags.ChurnApprovalInterval.Name), MetricsConfig: MetricsConfig{ - HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), + HTTPPort: ctx.GlobalInt(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), }, }, nil diff --git a/operators/churner/flags/flags.go b/operators/churner/flags/flags.go index 906096c49..507a35276 100644 --- a/operators/churner/flags/flags.go +++ b/operators/churner/flags/flags.go @@ -58,11 +58,11 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"), } /* Optional Flags*/ - MetricsHTTPPort = cli.StringFlag{ + MetricsHTTPPort = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), Usage: "the http port which the metrics prometheus server is listening", Required: false, - Value: "9100", + Value: 9100, EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } ChurnApprovalInterval = cli.DurationFlag{ diff --git a/operators/churner/mdoc/main.go b/operators/churner/mdoc/main.go new file mode 100644 index 000000000..db022d61f --- /dev/null +++ b/operators/churner/mdoc/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/operators/churner" +) + +// main generates documentation for churner metrics. +func main() { + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + if err != nil { + panic(err) + } + + metrics, err := churner.NewMetrics(0, logger) + if err != nil { + panic(err) + } + + err = metrics.WriteMetricsDocumentation() + if err != nil { + panic(err) + } +} diff --git a/operators/churner/metrics.go b/operators/churner/metrics.go index 2cece57ad..1f586e7ef 100644 --- a/operators/churner/metrics.go +++ b/operators/churner/metrics.go @@ -1,15 +1,12 @@ package churner import ( - "context" - "fmt" - "net/http" + "github.com/Layr-Labs/eigenda/common/metrics" + "time" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc/codes" ) @@ -28,7 +25,7 @@ const ( ) // Note: statusCodeMap must be maintained in sync with failure reason constants. -var statusCodeMap map[FailReason]string = map[FailReason]string{ +var statusCodeMap = map[FailReason]string{ FailReasonRateLimitExceeded: codes.ResourceExhausted.String(), FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(), FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(), @@ -40,63 +37,80 @@ var statusCodeMap map[FailReason]string = map[FailReason]string{ } type MetricsConfig struct { - HTTPPort string + HTTPPort int EnableMetrics bool } type Metrics struct { - registry *prometheus.Registry + metricsServer metrics.Metrics - NumRequests *prometheus.CounterVec - Latency *prometheus.SummaryVec + numRequests metrics.CountMetric + latency metrics.LatencyMetric - httpPort string - logger logging.Logger + logger logging.Logger } -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { - namespace := "eigenda_churner" +type latencyLabel struct { + method string +} + +type numRequestsLabel struct { + status string + method string + reason string +} + +func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) { reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - metrics := &Metrics{ - NumRequests: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "requests", - Help: "the number of requests", - }, - []string{"status", "reason", "method"}, - ), - Latency: promauto.With(reg).NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "latency_ms", - Help: "latency summary in milliseconds", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, - }, - []string{"method"}, - ), - registry: reg, - httpPort: httpPort, - logger: logger.With("component", "ChurnerMetrics"), + metricsServer := metrics.NewMetrics(logger, &metrics.Config{ + Namespace: "eigenda_churner", + HTTPPort: httpPort, + }) + + numRequests, err := metricsServer.NewCountMetric( + "request", + "the number of requests", + numRequestsLabel{}) + if err != nil { + return nil, err } - return metrics + + latency, err := metricsServer.NewLatencyMetric( + "latency", + "latency summary in milliseconds", + latencyLabel{}, + &metrics.Quantile{Quantile: 0.5, Error: 0.05}, + &metrics.Quantile{Quantile: 0.9, Error: 0.01}, + &metrics.Quantile{Quantile: 0.95, Error: 0.01}, + &metrics.Quantile{Quantile: 0.99, Error: 0.001}) + if err != nil { + return nil, err + } + + return &Metrics{ + metricsServer: metricsServer, + numRequests: numRequests, + latency: latency, + logger: logger.With("component", "ChurnerMetrics"), + }, nil } -// ObserveLatency observes the latency of a stage in 'stage -func (g *Metrics) ObserveLatency(method string, latencyMs float64) { - g.Latency.WithLabelValues(method).Observe(latencyMs) +// WriteMetricsDocumentation writes the metrics for the churner to a markdown file. +func (g *Metrics) WriteMetricsDocumentation() error { + return g.metricsServer.WriteMetricsDocumentation("operators/churner/churner-metrics.md") +} + +// ObserveLatency observes the latency of a stage +func (g *Metrics) ObserveLatency(method string, latency time.Duration) { + g.latency.ReportLatency(latency, latencyLabel{method: method}) } // IncrementSuccessfulRequestNum increments the number of successful requests func (g *Metrics) IncrementSuccessfulRequestNum(method string) { - g.NumRequests.With(prometheus.Labels{ - "status": "success", - "method": method, - "reason": "", - }).Inc() + g.numRequests.Increment(numRequestsLabel{status: "success", method: method}) } // IncrementFailedRequestNum increments the number of failed requests @@ -108,25 +122,11 @@ func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) { // handle a negligence of mapping from failure reason to status code. code = codes.Internal.String() } - g.NumRequests.With(prometheus.Labels{ - "status": code, - "reason": string(reason), - "method": method, - }).Inc() + + g.numRequests.Increment(numRequestsLabel{status: code, reason: string(reason), method: method}) } // Start starts the metrics server -func (g *Metrics) Start(ctx context.Context) { - g.logger.Info("Starting metrics server at ", "port", g.httpPort) - addr := fmt.Sprintf(":%s", g.httpPort) - go func() { - log := g.logger - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor( - g.registry, - promhttp.HandlerOpts{}, - )) - err := http.ListenAndServe(addr, mux) - log.Error("Prometheus server failed", "err", err) - }() +func (g *Metrics) Start() error { + return g.metricsServer.Start() } diff --git a/operators/churner/server.go b/operators/churner/server.go index fc8ba9310..83f62bf7f 100644 --- a/operators/churner/server.go +++ b/operators/churner/server.go @@ -47,8 +47,12 @@ func NewServer( func (s *Server) Start(metricsConfig MetricsConfig) error { // Enable Metrics Block if metricsConfig.EnableMetrics { - httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort) - s.metrics.Start(context.Background()) + httpSocket := fmt.Sprintf(":%d", metricsConfig.HTTPPort) + err := s.metrics.Start() + if err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) + } + s.logger.Info("Enabled metrics for Churner", "socket", httpSocket) } return nil @@ -62,7 +66,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl } timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds + s.metrics.ObserveLatency("Churn", time.Duration(f*float64(time.Second))) })) defer timer.ObserveDuration() s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds()) diff --git a/operators/churner/server_test.go b/operators/churner/server_test.go index 5c7c471b7..e2b0fb6a2 100644 --- a/operators/churner/server_test.go +++ b/operators/churner/server_test.go @@ -181,7 +181,8 @@ func newTestServer(t *testing.T) *churner.Server { setupMockWriter() - metrics := churner.NewMetrics("9001", logger) + metrics, err := churner.NewMetrics(9001, logger) + assert.NoError(t, err) cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) if err != nil { log.Fatalln("cannot create churner", err) diff --git a/operators/churner/tests/churner_test.go b/operators/churner/tests/churner_test.go index ba9f11c52..5625c537c 100644 --- a/operators/churner/tests/churner_test.go +++ b/operators/churner/tests/churner_test.go @@ -225,7 +225,8 @@ func newTestServer(t *testing.T) *churner.Server { ) assert.NoError(t, err) - metrics := churner.NewMetrics("9001", logger) + metrics, err := churner.NewMetrics(9001, logger) + assert.NoError(t, err) cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics) assert.NoError(t, err)