Skip to content

Commit

Permalink
Incremental progress.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 27, 2024
1 parent 3b4637e commit 60f015e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 18 deletions.
31 changes: 30 additions & 1 deletion relay/limiter/chunk_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limiter

import (
"fmt"
"github.com/Layr-Labs/eigenda/common/metrics"
"golang.org/x/time/rate"
"sync"
"time"
Expand Down Expand Up @@ -36,12 +37,17 @@ type ChunkRateLimiter struct {
// perClientOperationsInFlight is the number of GetChunk operations currently in flight for each client.
perClientOperationsInFlight map[string]int

// limiterCounter is used to track rate limiting events, ignored if nil
limiterCounter metrics.CountMetric

// this lock is used to provide thread safety
lock sync.Mutex
}

// NewChunkRateLimiter creates a new ChunkRateLimiter.
func NewChunkRateLimiter(config *Config) *ChunkRateLimiter {
func NewChunkRateLimiter(
config *Config,
limiterCounter metrics.CountMetric) *ChunkRateLimiter {

globalOpLimiter := rate.NewLimiter(rate.Limit(
config.MaxGetChunkOpsPerSecond),
Expand All @@ -58,9 +64,14 @@ func NewChunkRateLimiter(config *Config) *ChunkRateLimiter {
perClientOpLimiter: make(map[string]*rate.Limiter),
perClientBandwidthLimiter: make(map[string]*rate.Limiter),
perClientOperationsInFlight: make(map[string]int),
limiterCounter: limiterCounter,
}
}

type RateLimitLabel struct {
reason string
}

// BeginGetChunkOperation should be called when a GetChunk operation is about to begin. If it returns an error,
// the operation should not be performed. If it does not return an error, FinishGetChunkOperation should be
// called when the operation completes.
Expand Down Expand Up @@ -90,19 +101,31 @@ func (l *ChunkRateLimiter) BeginGetChunkOperation(
}

if l.globalOperationsInFlight >= l.config.MaxConcurrentGetChunkOps {
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"global concurrency"})
}
return fmt.Errorf(
"global concurrent request limit %d exceeded for GetChunks operations, try again later",
l.config.MaxConcurrentGetChunkOps)
}
if l.globalOpLimiter.TokensAt(now) < 1 {
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"global rate"})
}
return fmt.Errorf("global rate limit %0.1fhz exceeded for GetChunks operations, try again later",
l.config.MaxGetChunkOpsPerSecond)
}
if l.perClientOperationsInFlight[requesterID] >= l.config.MaxConcurrentGetChunkOpsClient {
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"client concurrency"})
}
return fmt.Errorf("client concurrent request limit %d exceeded for GetChunks",
l.config.MaxConcurrentGetChunkOpsClient)
}
if l.perClientOpLimiter[requesterID].TokensAt(now) < 1 {
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"client rate"})
}
return fmt.Errorf("client rate limit %0.1fhz exceeded for GetChunks, try again later",
l.config.MaxGetChunkOpsPerSecondClient)
}
Expand Down Expand Up @@ -139,6 +162,9 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s

allowed := l.globalBandwidthLimiter.AllowN(now, bytes)
if !allowed {
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"global bandwidth"})
}
return fmt.Errorf("global rate limit %dMiB exceeded for GetChunk bandwidth, try again later",
int(l.config.MaxGetChunkBytesPerSecond/1024/1024))
}
Expand All @@ -150,6 +176,9 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s
allowed = limiter.AllowN(now, bytes)
if !allowed {
l.globalBandwidthLimiter.AllowN(now, -bytes)
if l.limiterCounter != nil {
l.limiterCounter.Increment(RateLimitLabel{"client bandwidth"})
}
return fmt.Errorf("client rate limit %dMiB exceeded for GetChunk bandwidth, try again later",
int(l.config.MaxGetChunkBytesPerSecondClient/1024/1024))
}
Expand Down
12 changes: 6 additions & 6 deletions relay/limiter/chunk_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestConcurrentGetChunksOperations(t *testing.T) {

userID := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestGetChunksRateLimit(t *testing.T) {

userID := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestGetChunksBandwidthLimit(t *testing.T) {

userID := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestPerClientConcurrencyLimit(t *testing.T) {
userID1 := tu.RandomString(64)
userID2 := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestOpLimitPerClient(t *testing.T) {
userID1 := tu.RandomString(64)
userID2 := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestBandwidthLimitPerClient(t *testing.T) {
userID1 := tu.RandomString(64)
userID2 := tu.RandomString(64)

limiter := NewChunkRateLimiter(config)
limiter := NewChunkRateLimiter(config, nil)

// time starts at current time, but advances manually afterward
now := time.Now()
Expand Down
76 changes: 74 additions & 2 deletions relay/metrics/relay_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"github.com/Layr-Labs/eigenda/common/metrics"
"github.com/Layr-Labs/eigenda/relay/limiter"
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"google.golang.org/grpc"
Expand All @@ -10,6 +11,13 @@ import (
type RelayMetrics struct {
metricsServer metrics.Metrics
grpcServerOption grpc.ServerOption

GetChunksLatency metrics.LatencyMetric
GetChunksAuthenticationLatency metrics.LatencyMetric
GetChunksMetadataLatency metrics.LatencyMetric
GetChunksDataLatency metrics.LatencyMetric
GetChunksAuthFailures metrics.CountMetric
GetChunksRateLimited metrics.CountMetric
}

// NewRelayMetrics creates a new RelayMetrics instance, which encapsulates all metrics related to the relay.
Expand All @@ -23,9 +31,73 @@ func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) {
grpcMetrics.UnaryServerInterceptor(),
)

standardQuantiles := []*metrics.Quantile{
metrics.NewQuantile(0.5),
metrics.NewQuantile(0.9),
metrics.NewQuantile(0.99),
}

getChunksLatencyMetric, err := server.NewLatencyMetric(
"get_chunks_latency",
"Latency of the GetChunks RPC",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getChunksAuthenticationLatencyMetric, err := server.NewLatencyMetric(
"get_chunks_authentication_latency",
"Latency of the GetChunks RPC client authentication",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getChunksMetadataLatencyMetric, err := server.NewLatencyMetric(
"get_chunks_metadata_latency",
"Latency of the GetChunks RPC metadata retrieval",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getChunksDataLatencyMetric, err := server.NewLatencyMetric(
"get_chunks_data_latency",
"Latency of the GetChunks RPC data retrieval",
nil,
standardQuantiles...)
if err != nil {
return nil, err
}

getChunksAuthFailures, err := server.NewCountMetric(
"get_chunks_auth_failure",
"Number of GetChunks RPC authentication failures",
nil)
if err != nil {
return nil, err
}

getChunksRateLimited, err := server.NewCountMetric(
"get_chunks_rate_limited",
"Number of GetChunks RPC rate limited",
limiter.RateLimitLabel{})
if err != nil {
return nil, err
}

return &RelayMetrics{
metricsServer: server,
grpcServerOption: grpcServerOption,
metricsServer: server,
grpcServerOption: grpcServerOption,
GetChunksLatency: getChunksLatencyMetric,
GetChunksAuthenticationLatency: getChunksAuthenticationLatencyMetric,
GetChunksMetadataLatency: getChunksMetadataLatencyMetric,
GetChunksDataLatency: getChunksDataLatencyMetric,
GetChunksAuthFailures: getChunksAuthFailures,
GetChunksRateLimited: getChunksRateLimited,
}, nil
}

Expand Down
29 changes: 21 additions & 8 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewServer(
blobProvider: bp,
chunkProvider: cp,
blobRateLimiter: limiter.NewBlobRateLimiter(&config.RateLimits),
chunkRateLimiter: limiter.NewChunkRateLimiter(&config.RateLimits),
chunkRateLimiter: limiter.NewChunkRateLimiter(&config.RateLimits, relayMetrics.GetChunksRateLimited),
authenticator: authenticator,
relayMetrics: relayMetrics,
}, nil
Expand Down Expand Up @@ -258,6 +258,8 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G

// GetChunks retrieves chunks from blobs stored by the relay.
func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
start := time.Now()

if s.config.Timeouts.GetChunksTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.config.Timeouts.GetChunksTimeout)
Expand All @@ -281,13 +283,18 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*

err := s.authenticator.AuthenticateGetChunksRequest(ctx, clientAddress, request, time.Now())
if err != nil {
s.relayMetrics.GetChunksAuthFailures.Increment()
return nil, fmt.Errorf("auth failed: %w", err)
}
}

finishedAuthenticating := time.Now()
if s.authenticator != nil {
s.relayMetrics.GetChunksAuthenticationLatency.ReportLatency(finishedAuthenticating.Sub(start))
}

clientID := string(request.OperatorId)
err := s.chunkRateLimiter.BeginGetChunkOperation(time.Now(), clientID)

if err != nil {
return nil, err
}
Expand All @@ -305,6 +312,9 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
"error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err)
}

finishedFetchingMetadata := time.Now()
s.relayMetrics.GetChunksMetadataLatency.ReportLatency(finishedFetchingMetadata.Sub(finishedAuthenticating))

requiredBandwidth, err := computeChunkRequestRequiredBandwidth(request, mMap)
if err != nil {
return nil, fmt.Errorf("error computing required bandwidth: %w", err)
Expand All @@ -324,6 +334,10 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
return nil, fmt.Errorf("error gathering chunk data: %w", err)
}

finishedFetchingData := time.Now()
s.relayMetrics.GetChunksDataLatency.ReportLatency(finishedFetchingData.Sub(finishedFetchingMetadata))
s.relayMetrics.GetChunksLatency.ReportLatency(time.Since(start))

return &pb.GetChunksReply{
Data: bytesToSend,
}, nil
Expand Down Expand Up @@ -443,6 +457,11 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met

// Start starts the server listening for requests. This method will block until the server is stopped.
func (s *Server) Start(ctx context.Context) error {
err := s.relayMetrics.Start()
if err != nil {
return fmt.Errorf("error starting metrics server: %w", err)
}

if s.chainReader != nil && s.metadataProvider != nil {
go func() {
_ = s.RefreshOnchainState(ctx)
Expand All @@ -467,16 +486,10 @@ func (s *Server) Start(ctx context.Context) error {
healthcheck.RegisterHealthServer(name, s.grpcServer)

s.logger.Info("GRPC Listening", "port", s.config.GRPCPort, "address", listener.Addr().String())

if err = s.grpcServer.Serve(listener); err != nil {
return errors.New("could not start GRPC server")
}

err = s.relayMetrics.Start()
if err != nil {
return fmt.Errorf("error starting metrics server: %w", err)
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion relay/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package relay

import (
"context"
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -56,6 +57,7 @@ func defaultConfig() *Config {
InternalGetProofsTimeout: 10 * time.Second,
InternalGetCoefficientsTimeout: 10 * time.Second,
},
MetricsPort: 9101,
}
}

Expand Down Expand Up @@ -394,8 +396,9 @@ func TestReadWriteChunks(t *testing.T) {
expectedData := make(map[v2.BlobKey][]*encoding.Frame)
fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo)

blobCount := 10
blobCount := 100 // TODO revert this to 10
for i := 0; i < blobCount; i++ {
fmt.Printf("blob %d\n", i) // TODO remove this
header, _, chunks := randomBlobChunks(t)

blobKey, err := header.BlobKey()
Expand Down

0 comments on commit 60f015e

Please sign in to comment.