Skip to content

Commit

Permalink
Added GetBlob metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 27, 2024
1 parent 671f0c8 commit 4adb7ea
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 26 deletions.
15 changes: 14 additions & 1 deletion relay/limiter/blob_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limiter

import (
"fmt"
"github.com/Layr-Labs/eigenda/common/metrics"
"golang.org/x/time/rate"
"sync"
"time"
Expand All @@ -23,12 +24,15 @@ type BlobRateLimiter struct {
// operationsInFlight is the number of GetBlob operations currently in flight.
operationsInFlight int

// limitCounter is used to track rate limiting events, ignored if nil
limitCounter metrics.CountMetric

// this lock is used to provide thread safety
lock sync.Mutex
}

// NewBlobRateLimiter creates a new BlobRateLimiter.
func NewBlobRateLimiter(config *Config) *BlobRateLimiter {
func NewBlobRateLimiter(config *Config, limiterCounter metrics.CountMetric) *BlobRateLimiter {
globalGetBlobOpLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobOpsPerSecond),
config.GetBlobOpsBurstiness)
Expand Down Expand Up @@ -57,10 +61,16 @@ func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error {
defer l.lock.Unlock()

if l.operationsInFlight >= l.config.MaxConcurrentGetBlobOps {
if l.limitCounter != nil {
l.limitCounter.Increment(RateLimitLabel{"global concurrency"})
}
return fmt.Errorf("global concurrent request limit %d exceeded for getBlob operations, try again later",
l.config.MaxConcurrentGetBlobOps)
}
if l.opLimiter.TokensAt(now) < 1 {
if l.limitCounter != nil {
l.limitCounter.Increment(RateLimitLabel{"global rate"})
}
return fmt.Errorf("global rate limit %0.1fhz exceeded for getBlob operations, try again later",
l.config.MaxGetBlobOpsPerSecond)
}
Expand Down Expand Up @@ -98,6 +108,9 @@ func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) e

allowed := l.bandwidthLimiter.AllowN(now, int(bytes))
if !allowed {
if l.limitCounter != nil {
l.limitCounter.Increment(RateLimitLabel{"global bandwidth"})
}
return fmt.Errorf("global rate limit %dMib/s exceeded for getBlob bandwidth, try again later",
int(l.config.MaxGetBlobBytesPerSecond/1024/1024))
}
Expand Down
6 changes: 3 additions & 3 deletions relay/limiter/blob_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestConcurrentBlobOperations(t *testing.T) {
// Make the burstiness limit high enough that we won't be rate limited
config.GetBlobOpsBurstiness = concurrencyLimit * 100

limiter := NewBlobRateLimiter(config)
limiter := NewBlobRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestGetBlobOpRateLimit(t *testing.T) {
config.GetBlobOpsBurstiness = int(config.MaxGetBlobOpsPerSecond) + rand.Intn(10)
config.MaxConcurrentGetBlobOps = 1

limiter := NewBlobRateLimiter(config)
limiter := NewBlobRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestGetBlobBandwidthLimit(t *testing.T) {
config.MaxGetBlobBytesPerSecond = float64(1024 + rand.Intn(1024*1024))
config.GetBlobBytesBurstiness = int(config.MaxGetBlobBytesPerSecond) + rand.Intn(1024*1024)

limiter := NewBlobRateLimiter(config)
limiter := NewBlobRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions relay/mdoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/relay/metrics"
"github.com/Layr-Labs/eigenda/relay"
)

// main generates documentation for relay metrics.
Expand All @@ -12,7 +12,7 @@ func main() {
panic(err)
}

metrics, err := metrics.NewRelayMetrics(logger, 0)
metrics, err := relay.NewRelayMetrics(logger, 0)
if err != nil {
panic(err)
}
Expand Down
69 changes: 67 additions & 2 deletions relay/mdoc/relay-metrics.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
# Metrics Documentation for namespace 'relay'

This documentation was automatically generated at time `2024-11-27T10:08:20-06:00`
This documentation was automatically generated at time `2024-11-27T10:22:37-06:00`

There are a total of `8` registered metrics.
There are a total of `13` registered metrics.

---

## average_get_blob_data_bytes

Average data size of requested blobs

| | |
|---|---|
| **Name** | `average_get_blob_data` |
| **Unit** | `bytes` |
| **Type** | `running average` |
| **Time Window** | `1m0s` |
| **Fully Qualified Name** | `relay_average_get_blob_data_bytes` |
---

## average_get_chunks_data_bytes

Average data size in a GetChunks request
Expand All @@ -32,6 +45,58 @@ Average number of keys in a GetChunks request
| **Fully Qualified Name** | `relay_average_get_chunks_key_count` |
---

## get_blob_data_latency_ms

Latency of the GetBlob RPC data retrieval

| | |
|---|---|
| **Name** | `get_blob_data_latency` |
| **Unit** | `ms` |
| **Type** | `latency` |
| **Quantiles** | `0.500`, `0.900`, `0.990` |
| **Fully Qualified Name** | `relay_get_blob_data_latency_ms` |
---

## get_blob_latency_ms

Latency of the GetBlob RPC

| | |
|---|---|
| **Name** | `get_blob_latency` |
| **Unit** | `ms` |
| **Type** | `latency` |
| **Quantiles** | `0.500`, `0.900`, `0.990` |
| **Fully Qualified Name** | `relay_get_blob_latency_ms` |
---

## get_blob_metadata_latency_ms

Latency of the GetBlob RPC metadata retrieval

| | |
|---|---|
| **Name** | `get_blob_metadata_latency` |
| **Unit** | `ms` |
| **Type** | `latency` |
| **Quantiles** | `0.500`, `0.900`, `0.990` |
| **Fully Qualified Name** | `relay_get_blob_metadata_latency_ms` |
---

## get_blob_rate_limited_count

Number of GetBlob RPC rate limited

| | |
|---|---|
| **Name** | `get_blob_rate_limited` |
| **Unit** | `count` |
| **Labels** | `reason` |
| **Type** | `counter` |
| **Fully Qualified Name** | `relay_get_blob_rate_limited_count` |
---

## get_chunks_auth_failure_count

Number of GetChunks RPC authentication failures
Expand Down
62 changes: 61 additions & 1 deletion relay/metrics/relay_metrics.go → relay/relay_metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package relay

import (
"github.com/Layr-Labs/eigenda/common/metrics"
Expand All @@ -13,6 +13,9 @@ type RelayMetrics struct {
metricsServer metrics.Metrics
grpcServerOption grpc.ServerOption

// TODO (after cache changes merge): add metrics for cache

// GetChunks metrics
GetChunksLatency metrics.LatencyMetric
GetChunksAuthenticationLatency metrics.LatencyMetric
GetChunksMetadataLatency metrics.LatencyMetric
Expand All @@ -21,6 +24,13 @@ type RelayMetrics struct {
GetChunksRateLimited metrics.CountMetric
GetChunksAverageKeyCount metrics.RunningAverageMetric
GetChunksAverageDataSize metrics.RunningAverageMetric

// GetBlob metrics
GetBlobLatency metrics.LatencyMetric
GetBlobMetadataLatency metrics.LatencyMetric
GetBlobDataLatency metrics.LatencyMetric
GetBlobRateLimited metrics.CountMetric
GetBlobAverageDataSize metrics.RunningAverageMetric
}

// NewRelayMetrics creates a new RelayMetrics instance, which encapsulates all metrics related to the relay.
Expand Down Expand Up @@ -112,6 +122,51 @@ func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) {
return nil, err
}

getBlobLatencyMetric, err := server.NewLatencyMetric(
"get_blob_latency",
"Latency of the GetBlob RPC",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getBlobMetadataLatencyMetric, err := server.NewLatencyMetric(
"get_blob_metadata_latency",
"Latency of the GetBlob RPC metadata retrieval",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getBlobDataLatencyMetric, err := server.NewLatencyMetric(
"get_blob_data_latency",
"Latency of the GetBlob RPC data retrieval",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getBlobRateLimited, err := server.NewCountMetric(
"get_blob_rate_limited",
"Number of GetBlob RPC rate limited",
limiter.RateLimitLabel{})
if err != nil {
return nil, err
}

getBlobAverageDataSize, err := server.NewRunningAverageMetric(
"average_get_blob_data",
"bytes",
"Average data size of requested blobs",
time.Minute,
nil)
if err != nil {
return nil, err
}

return &RelayMetrics{
metricsServer: server,
grpcServerOption: grpcServerOption,
Expand All @@ -123,6 +178,11 @@ func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) {
GetChunksRateLimited: getChunksRateLimited,
GetChunksAverageKeyCount: getChunksAverageKeyCount,
GetChunksAverageDataSize: getChunksAverageDataSize,
GetBlobLatency: getBlobLatencyMetric,
GetBlobMetadataLatency: getBlobMetadataLatencyMetric,
GetBlobDataLatency: getBlobDataLatencyMetric,
GetBlobRateLimited: getBlobRateLimited,
GetBlobAverageDataSize: getBlobAverageDataSize,
}, nil
}

Expand Down
Loading

0 comments on commit 4adb7ea

Please sign in to comment.