Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional queue metrics #919

Merged
merged 5 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions disperser/cmd/encoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/disperser/cmd/encoder/flags"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/encoder"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
"github.com/prometheus/client_golang/prometheus"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -56,11 +58,15 @@ func RunEncoderServer(ctx *cli.Context) error {
return err
}

metrics := encoder.NewMetrics(config.MetricsConfig.HTTPPort, logger)
reg := prometheus.NewRegistry()
metrics := encoder.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger)
grpcMetrics := grpcprom.NewServerMetrics()
if config.MetricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort)
metrics.Start(context.Background())
logger.Info("Enabled metrics for Encoder", "socket", httpSocket)

reg.MustRegister(grpcMetrics)
}

if config.EncoderVersion == V2 {
Expand Down Expand Up @@ -100,7 +106,7 @@ func RunEncoderServer(ctx *cli.Context) error {
return fmt.Errorf("failed to create encoder: %w", err)
}

server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics)
server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics, grpcMetrics)

return server.Start()

Expand Down
26 changes: 24 additions & 2 deletions disperser/encoder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type Metrics struct {
BlobSizeTotal *prometheus.CounterVec
Latency *prometheus.SummaryVec
BlobQueue *prometheus.GaugeVec
QueueCapacity prometheus.Gauge
QueueUtilization prometheus.Gauge
}

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
reg := prometheus.NewRegistry()
func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics {
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

Expand Down Expand Up @@ -71,6 +72,20 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
},
[]string{"size_bucket"},
),
QueueCapacity: promauto.With(reg).NewGauge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is configured, so doesn't look it should be exported via metric again.

Utilization is a division of queue size (already exported) by capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's convenient for visualizing the capacity on grafana dashboard. If we change the capacity it should also change on the dashboard.

prometheus.GaugeOpts{
Namespace: "eigenda_encoder",
Name: "request_pool_capacity",
Help: "The maximum capacity of the request pool",
},
),
QueueUtilization: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: "eigenda_encoder",
Name: "request_pool_utilization",
Help: "Current utilization of request pool (total across all buckets)",
},
),
}
}

Expand Down Expand Up @@ -107,9 +122,16 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) {
}

func (m *Metrics) ObserveQueue(queueStats map[string]int) {
total := 0
for bucket, num := range queueStats {
m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num))
total += num
}
m.QueueUtilization.Set(float64(total))
}

func (m *Metrics) SetQueueCapacity(capacity int) {
m.QueueCapacity.Set(float64(capacity))
}

func (m *Metrics) Start(ctx context.Context) {
Expand Down
22 changes: 19 additions & 3 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/disperser"
pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand All @@ -26,6 +27,7 @@ type EncoderServer struct {
logger logging.Logger
prover encoding.Prover
metrics *Metrics
grpcMetrics *grpcprom.ServerMetrics
close func()

runningRequests chan struct{}
Expand All @@ -39,12 +41,16 @@ type blobRequest struct {
blobSizeByte int
}

func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer {
func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics) *EncoderServer {
// Set initial queue capacity metric
metrics.SetQueueCapacity(config.RequestPoolSize)

return &EncoderServer{
config: config,
logger: logger.With("component", "EncoderServer"),
prover: prover,
metrics: metrics,
grpcMetrics: grpcMetrics,

runningRequests: make(chan struct{}, config.MaxConcurrentRequests),
requestPool: make(chan blobRequest, config.RequestPoolSize),
Expand All @@ -61,9 +67,14 @@ func (s *EncoderServer) Start() error {
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt,
grpc.UnaryInterceptor(
s.grpcMetrics.UnaryServerInterceptor(),
),
)
reflection.Register(gs)
pb.RegisterEncoderServer(gs, s)
s.grpcMetrics.InitializeMetrics(gs)

// Register Server for Health Checks
name := pb.Encoder_ServiceDesc.ServiceName
Expand Down Expand Up @@ -91,17 +102,22 @@ func (s *EncoderServer) Close() {
func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
startTime := time.Now()
blobSize := len(req.GetData())
sizeBucket := common.BlobSizeBucket(blobSize)



select {
case s.requestPool <- blobRequest{blobSizeByte: blobSize}:
s.queueLock.Lock()
s.queueStats[common.BlobSizeBucket(blobSize)]++
s.queueStats[sizeBucket]++
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}

s.runningRequests <- struct{}{}
defer s.popRequest()

Expand Down
15 changes: 8 additions & 7 deletions disperser/encoder/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -109,8 +110,8 @@ func getTestData() (core.Blob, encoding.EncodingParams) {
}

func newEncoderTestServer(t *testing.T) *EncoderServer {
metrics := NewMetrics("9000", logger)
return NewEncoderServer(testServerConfig, logger, testProver, metrics)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
return NewEncoderServer(testServerConfig, logger, testProver, metrics, nil)
}

func TestEncodeBlob(t *testing.T) {
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestThrottling(t *testing.T) {

lengthCommitment = lengthProof

metrics := NewMetrics("9000", logger)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
concurrentRequests := 2
requestPoolSize := 4
encoder := &encmock.MockEncoder{
Expand All @@ -202,7 +203,7 @@ func TestThrottling(t *testing.T) {
MaxConcurrentRequests: concurrentRequests,
RequestPoolSize: requestPoolSize,
}
s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics)
s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics, nil)
testBlobData, testEncodingParams := getTestData()

testEncodingParamsProto := &pb.EncodingParams{
Expand Down Expand Up @@ -254,8 +255,8 @@ func TestThrottling(t *testing.T) {
func TestEncoderPointsLoading(t *testing.T) {
// encoder 1 only loads 1500 points
prover1, config1 := makeTestProver(1500)
metrics := NewMetrics("9000", logger)
server1 := NewEncoderServer(config1, logger, prover1, metrics)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
server1 := NewEncoderServer(config1, logger, prover1, metrics, nil)

testBlobData, testEncodingParams := getTestData()

Expand Down Expand Up @@ -299,7 +300,7 @@ func TestEncoderPointsLoading(t *testing.T) {

// encoder 2 only loads 2900 points
encoder2, config2 := makeTestProver(2900)
server2 := NewEncoderServer(config2, logger, encoder2, metrics)
server2 := NewEncoderServer(config2, logger, encoder2, metrics, nil)

reply2, err := server2.EncodeBlob(context.Background(), encodeBlobRequestProto)
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion disperser/encoder/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -196,7 +197,7 @@ func createTestComponents(t *testing.T) *testComponents {
t.Helper()
prover, err := makeTestProver(300000)
require.NoError(t, err, "Failed to create prover")
metrics := encoder.NewMetrics("9000", logger)
metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger)
s3Client := mock.NewS3Client()
dynamoDBClient := &mock.MockDynamoDBClient{}
blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger)
Expand Down
4 changes: 2 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
}

p0, _ := mustMakeTestComponents()
metrics := encoder.NewMetrics("9000", logger)
metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger)
grpcEncoder := encoder.NewEncoderServer(encoder.ServerConfig{
GrpcPort: encoderPort,
MaxConcurrentRequests: 16,
RequestPoolSize: 32,
}, logger, p0, metrics)
}, logger, p0, metrics, grpcprom.NewServerMetrics())

encoderClient, err := encoder.NewEncoderClient(batcherConfig.EncoderSocket, 10*time.Second)
if err != nil {
Expand Down
Loading