diff --git a/disperser/cmd/encoder/main.go b/disperser/cmd/encoder/main.go index d546291da..9c90d6bf7 100644 --- a/disperser/cmd/encoder/main.go +++ b/disperser/cmd/encoder/main.go @@ -10,8 +10,10 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/s3" "github.com/Layr-Labs/eigenda/disperser/cmd/encoder/flags" blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/encoder" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/relay/chunkstore" "github.com/urfave/cli" ) @@ -56,11 +58,15 @@ func RunEncoderServer(ctx *cli.Context) error { return err } - metrics := encoder.NewMetrics(config.MetricsConfig.HTTPPort, logger) + reg := prometheus.NewRegistry() + metrics := encoder.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) + grpcMetrics := grpcprom.NewServerMetrics() if config.MetricsConfig.EnableMetrics { httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort) metrics.Start(context.Background()) logger.Info("Enabled metrics for Encoder", "socket", httpSocket) + + reg.MustRegister(grpcMetrics) } if config.EncoderVersion == V2 { @@ -100,7 +106,7 @@ func RunEncoderServer(ctx *cli.Context) error { return fmt.Errorf("failed to create encoder: %w", err) } - server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics) + server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics, grpcMetrics) return server.Start() diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index 11ba438b5..d4c4d8868 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -27,10 +27,11 @@ type Metrics struct { BlobSizeTotal *prometheus.CounterVec Latency *prometheus.SummaryVec BlobQueue *prometheus.GaugeVec + QueueCapacity prometheus.Gauge + QueueUtilization prometheus.Gauge } -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { - reg := prometheus.NewRegistry() +func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) @@ -71,6 +72,20 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { }, []string{"size_bucket"}, ), + QueueCapacity: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "request_pool_capacity", + Help: "The maximum capacity of the request pool", + }, + ), + QueueUtilization: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "request_pool_utilization", + Help: "Current utilization of request pool (total across all buckets)", + }, + ), } } @@ -107,9 +122,16 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) { } func (m *Metrics) ObserveQueue(queueStats map[string]int) { + total := 0 for bucket, num := range queueStats { m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num)) + total += num } + m.QueueUtilization.Set(float64(total)) +} + +func (m *Metrics) SetQueueCapacity(capacity int) { + m.QueueCapacity.Set(float64(capacity)) } func (m *Metrics) Start(ctx context.Context) { diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 18a7ad43e..4eb0c39f6 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" @@ -26,6 +27,7 @@ type EncoderServer struct { logger logging.Logger prover encoding.Prover metrics *Metrics + grpcMetrics *grpcprom.ServerMetrics close func() runningRequests chan struct{} @@ -39,12 +41,16 @@ type blobRequest struct { blobSizeByte int } -func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer { +func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics) *EncoderServer { + // Set initial queue capacity metric + metrics.SetQueueCapacity(config.RequestPoolSize) + return &EncoderServer{ config: config, logger: logger.With("component", "EncoderServer"), prover: prover, metrics: metrics, + grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), requestPool: make(chan blobRequest, config.RequestPoolSize), @@ -61,9 +67,14 @@ func (s *EncoderServer) Start() error { } opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB - gs := grpc.NewServer(opt) + gs := grpc.NewServer(opt, + grpc.UnaryInterceptor( + s.grpcMetrics.UnaryServerInterceptor(), + ), + ) reflection.Register(gs) pb.RegisterEncoderServer(gs, s) + s.grpcMetrics.InitializeMetrics(gs) // Register Server for Health Checks name := pb.Encoder_ServiceDesc.ServiceName @@ -91,10 +102,14 @@ func (s *EncoderServer) Close() { func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) { startTime := time.Now() blobSize := len(req.GetData()) + sizeBucket := common.BlobSizeBucket(blobSize) + + + select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: s.queueLock.Lock() - s.queueStats[common.BlobSizeBucket(blobSize)]++ + s.queueStats[sizeBucket]++ s.metrics.ObserveQueue(s.queueStats) s.queueLock.Unlock() default: @@ -102,6 +117,7 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } + s.runningRequests <- struct{}{} defer s.popRequest() diff --git a/disperser/encoder/server_test.go b/disperser/encoder/server_test.go index 9aa12a728..0b2ba4da2 100644 --- a/disperser/encoder/server_test.go +++ b/disperser/encoder/server_test.go @@ -12,6 +12,7 @@ import ( "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -109,8 +110,8 @@ func getTestData() (core.Blob, encoding.EncodingParams) { } func newEncoderTestServer(t *testing.T) *EncoderServer { - metrics := NewMetrics("9000", logger) - return NewEncoderServer(testServerConfig, logger, testProver, metrics) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) + return NewEncoderServer(testServerConfig, logger, testProver, metrics, nil) } func TestEncodeBlob(t *testing.T) { @@ -179,7 +180,7 @@ func TestThrottling(t *testing.T) { lengthCommitment = lengthProof - metrics := NewMetrics("9000", logger) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) concurrentRequests := 2 requestPoolSize := 4 encoder := &encmock.MockEncoder{ @@ -202,7 +203,7 @@ func TestThrottling(t *testing.T) { MaxConcurrentRequests: concurrentRequests, RequestPoolSize: requestPoolSize, } - s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics) + s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics, nil) testBlobData, testEncodingParams := getTestData() testEncodingParamsProto := &pb.EncodingParams{ @@ -254,8 +255,8 @@ func TestThrottling(t *testing.T) { func TestEncoderPointsLoading(t *testing.T) { // encoder 1 only loads 1500 points prover1, config1 := makeTestProver(1500) - metrics := NewMetrics("9000", logger) - server1 := NewEncoderServer(config1, logger, prover1, metrics) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) + server1 := NewEncoderServer(config1, logger, prover1, metrics, nil) testBlobData, testEncodingParams := getTestData() @@ -299,7 +300,7 @@ func TestEncoderPointsLoading(t *testing.T) { // encoder 2 only loads 2900 points encoder2, config2 := makeTestProver(2900) - server2 := NewEncoderServer(config2, logger, encoder2, metrics) + server2 := NewEncoderServer(config2, logger, encoder2, metrics, nil) reply2, err := server2.EncodeBlob(context.Background(), encodeBlobRequestProto) assert.NoError(t, err) diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 26c7110f7..57a850b2c 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -18,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding/kzg/prover" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/relay/chunkstore" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -196,7 +197,7 @@ func createTestComponents(t *testing.T) *testComponents { t.Helper() prover, err := makeTestProver(300000) require.NoError(t, err, "Failed to create prover") - metrics := encoder.NewMetrics("9000", logger) + metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger) s3Client := mock.NewS3Client() dynamoDBClient := &mock.MockDynamoDBClient{} blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger) diff --git a/test/integration_test.go b/test/integration_test.go index 72caeb8fb..aebb546f4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -173,12 +173,12 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } p0, _ := mustMakeTestComponents() - metrics := encoder.NewMetrics("9000", logger) + metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger) grpcEncoder := encoder.NewEncoderServer(encoder.ServerConfig{ GrpcPort: encoderPort, MaxConcurrentRequests: 16, RequestPoolSize: 32, - }, logger, p0, metrics) + }, logger, p0, metrics, grpcprom.NewServerMetrics()) encoderClient, err := encoder.NewEncoderClient(batcherConfig.EncoderSocket, 10*time.Second) if err != nil {