diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 9ac580a4..df65d0b2 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -802,7 +802,7 @@ func Test_Ratelimits_Priority(t *testing.T) { limiter, ok := server.authorizer.Limiter.(*ratelimiter.TokenBucketRateLimiter) require.True(t, ok) limiter.Limits[ratelimiter.PUBLISH] = &ratelimiter.Limit{MaxTokens: 1, RatePerMinute: 0} - limiter.PriorityMultiplier = 2 + limiter.PublishPriorityMultiplier = 2 envs := makeEnvelopes(3) _, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[0:2]}) require.NoError(t, err) diff --git a/pkg/ratelimiter/rate_limiter.go b/pkg/ratelimiter/rate_limiter.go index 00c865d0..0d391925 100644 --- a/pkg/ratelimiter/rate_limiter.go +++ b/pkg/ratelimiter/rate_limiter.go @@ -14,12 +14,13 @@ import ( type LimitType string const ( - PRIORITY_MULTIPLIER = uint16(5) - DEFAULT_RATE_PER_MINUTE = uint16(2000) - DEFAULT_MAX_TOKENS = uint16(10000) - PUBLISH_RATE_PER_MINUTE = uint16(200) - PUBLISH_MAX_TOKENS = uint16(1000) - MAX_UINT_16 = 65535 + DEFAULT_PRIORITY_MULTIPLIER = uint16(5) + PUBLISH_PRIORITY_MULTIPLIER = uint16(25) + DEFAULT_RATE_PER_MINUTE = uint16(2000) + DEFAULT_MAX_TOKENS = uint16(10000) + PUBLISH_RATE_PER_MINUTE = uint16(200) + PUBLISH_MAX_TOKENS = uint16(1000) + MAX_UINT_16 = 65535 DEFAULT LimitType = "DEF" PUBLISH LimitType = "PUB" @@ -70,13 +71,14 @@ func (l Limit) Refill(entry *Entry, multiplier uint16) { // TokenBucketRateLimiter implements the RateLimiter interface type TokenBucketRateLimiter struct { - log *zap.Logger - ctx context.Context - mutex sync.RWMutex - newBuckets *Buckets // buckets that can be added to - oldBuckets *Buckets // buckets to be swept for expired entries - PriorityMultiplier uint16 - Limits map[LimitType]*Limit + log *zap.Logger + ctx context.Context + mutex sync.RWMutex + newBuckets *Buckets // buckets that can be added to + oldBuckets *Buckets // buckets to be swept for expired entries + PriorityMultiplier uint16 + PublishPriorityMultiplier uint16 + Limits map[LimitType]*Limit } func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucketRateLimiter { @@ -86,7 +88,8 @@ func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucke // TODO: need to periodically clear out expired items to avoid unlimited growth of the map. tb.newBuckets = NewBuckets(log, "buckets1") tb.oldBuckets = NewBuckets(log, "buckets2") - tb.PriorityMultiplier = PRIORITY_MULTIPLIER + tb.PriorityMultiplier = DEFAULT_PRIORITY_MULTIPLIER + tb.PublishPriorityMultiplier = PUBLISH_PRIORITY_MULTIPLIER tb.Limits = map[LimitType]*Limit{ DEFAULT: {DEFAULT_MAX_TOKENS, DEFAULT_RATE_PER_MINUTE}, PUBLISH: {PUBLISH_MAX_TOKENS, PUBLISH_RATE_PER_MINUTE}, @@ -106,7 +109,11 @@ func (rl *TokenBucketRateLimiter) fillAndReturnEntry(limitType LimitType, bucket limit := rl.getLimit(limitType) multiplier := uint16(1) if isPriority { - multiplier = rl.PriorityMultiplier + if limitType == PUBLISH { + multiplier = rl.PublishPriorityMultiplier + } else { + multiplier = rl.PriorityMultiplier + } } rl.mutex.RLock() if entry := rl.oldBuckets.getAndRefill(bucket, limit, multiplier, false); entry != nil { diff --git a/pkg/ratelimiter/rate_limiter_test.go b/pkg/ratelimiter/rate_limiter_test.go index c9046184..0cb30efc 100644 --- a/pkg/ratelimiter/rate_limiter_test.go +++ b/pkg/ratelimiter/rate_limiter_test.go @@ -65,7 +65,17 @@ func TestSpendAllowListed(t *testing.T) { // Set last seen to 5 minutes ago entry.lastSeen = time.Now().Add(-5 * time.Minute) entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true) - require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*PRIORITY_MULTIPLIER)) + require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*DEFAULT_PRIORITY_MULTIPLIER)) +} + +func TestSpendAllowListedPublish(t *testing.T) { + logger, _ := zap.NewDevelopment() + rl := NewTokenBucketRateLimiter(context.Background(), logger) + entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true) + // Set last seen to 5 minutes ago + entry.lastSeen = time.Now().Add(-5 * time.Minute) + entry = rl.fillAndReturnEntry(PUBLISH, walletAddress, true) + require.Equal(t, entry.tokens, uint16(5*PUBLISH_RATE_PER_MINUTE*PUBLISH_PRIORITY_MULTIPLIER)) } func TestMaxUint16(t *testing.T) { @@ -75,7 +85,7 @@ func TestMaxUint16(t *testing.T) { // Set last seen to 1 million minutes ago entry.lastSeen = time.Now().Add(-1000000 * time.Minute) entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true) - require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*PRIORITY_MULTIPLIER) + require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*DEFAULT_PRIORITY_MULTIPLIER) } // Ensures that the map can be accessed concurrently