Skip to content

Commit

Permalink
Made suggested changes.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Nov 27, 2024
1 parent f8327df commit e925517
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 55 deletions.
6 changes: 1 addition & 5 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ func newBlobProvider(
fetchTimeout: fetchTimeout,
}

c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize)
err := c.WithWeightCalculator(computeBlobCacheWeight)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight)

cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions relay/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ type Cache[K comparable, V any] interface {
// of the cache in and of itself.
Put(key K, value V)

// WithWeightCalculator sets the weight calculator for the cache. May only be called
// when the cache is empty. The weight calculator should be an idempotent function that
// always returns the same output given the same input.
WithWeightCalculator(weightCalculator WeightCalculator[K, V]) error

// Size returns the number of key-value pairs in the cache.
Size() int

Expand Down
28 changes: 21 additions & 7 deletions relay/cache/cache_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func TestRandomOperationsSingleThread(t *testing.T) {
return &str, nil
}
cacheSize := rand.Intn(dataSize) + 1
c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -81,7 +83,9 @@ func TestCacheMisses(t *testing.T) {
return &str, nil
}

c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -146,7 +150,9 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -217,7 +223,9 @@ func TestParallelAccessWithError(t *testing.T) {
}
cacheSize := 100

c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -291,7 +299,9 @@ func TestConcurrencyLimiter(t *testing.T) {
}

cacheSize := 100
c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, maxConcurrency, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -347,7 +357,9 @@ func TestOriginalRequesterTimesOut(t *testing.T) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down Expand Up @@ -437,7 +449,9 @@ func TestSecondaryRequesterTimesOut(t *testing.T) {
}
cacheSize := rand.Intn(dataSize) + 1

c := NewFIFOCache[int, *string](uint64(cacheSize))
c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 {
return 1
})

ca, err := NewCacheAccessor[int, *string](c, 0, accessor)
require.NoError(t, err)
Expand Down
17 changes: 2 additions & 15 deletions relay/cache/fifo-cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cache

import (
"errors"
"github.com/emirpasic/gods/queues"
"github.com/emirpasic/gods/queues/linkedlistqueue"
)
Expand All @@ -20,15 +19,11 @@ type FIFOCache[K comparable, V any] struct {
}

// NewFIFOCache creates a new FIFOCache.
func NewFIFOCache[K comparable, V any](maxWeight uint64) *FIFOCache[K, V] {
defaultWeightCalculator := func(key K, value V) uint64 {
return uint64(1)
}

func NewFIFOCache[K comparable, V any](maxWeight uint64, calculator WeightCalculator[K, V]) *FIFOCache[K, V] {
return &FIFOCache[K, V]{
maxWeight: maxWeight,
data: make(map[K]V),
weightCalculator: defaultWeightCalculator,
weightCalculator: calculator,
expirationQueue: linkedlistqueue.New(),
}
}
Expand Down Expand Up @@ -69,14 +64,6 @@ func (f *FIFOCache[K, V]) Put(key K, value V) {
}
}

func (f *FIFOCache[K, V]) WithWeightCalculator(weightCalculator WeightCalculator[K, V]) error {
if f.Size() > 0 {
return errors.New("cannot set weight calculator on non-empty cache")
}
f.weightCalculator = weightCalculator
return nil
}

func (f *FIFOCache[K, V]) Size() int {
return len(f.data)
}
Expand Down
12 changes: 4 additions & 8 deletions relay/cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ func TestExpirationOrder(t *testing.T) {
tu.InitializeRandom()

maxWeight := uint64(10 + rand.Intn(10))
c := NewFIFOCache[int, int](maxWeight)
c := NewFIFOCache[int, int](maxWeight, func(key int, value int) uint64 {
return 1
})

require.Equal(t, uint64(0), c.Weight())
require.Equal(t, 0, c.Size())
Expand Down Expand Up @@ -83,9 +85,7 @@ func TestWeightedValues(t *testing.T) {
return uint64(key)
}

c := NewFIFOCache[int, int](maxWeight)
err := c.WithWeightCalculator(weightCalculator)
require.NoError(t, err)
c := NewFIFOCache[int, int](maxWeight, weightCalculator)

expectedValues := make(map[int]int)

Expand Down Expand Up @@ -135,8 +135,4 @@ func TestWeightedValues(t *testing.T) {
require.True(t, ok)
require.Equal(t, v, value)
}

// Sanity check, attempting to update the weight calculator function at this point should fail.
err = c.WithWeightCalculator(weightCalculator)
require.Error(t, err)
}
6 changes: 1 addition & 5 deletions relay/chunk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ func newChunkProvider(
coefficientFetchTimeout: coefficientFetchTimeout,
}

c := cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize)
err := c.WithWeightCalculator(computeFramesCacheWeight)
if err != nil {
return nil, fmt.Errorf("error setting weight calculator: %w", err)
}
c := cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight)

cacheAccessor, err := cache.NewCacheAccessor[blobKeyWithMetadata, []*encoding.Frame](
c,
Expand Down
2 changes: 1 addition & 1 deletion relay/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MaxGRPCMessageSize: ctx.Int(flags.MaxGRPCMessageSizeFlag.Name),
MetadataCacheSize: ctx.Int(flags.MetadataCacheSizeFlag.Name),
MetadataMaxConcurrency: ctx.Int(flags.MetadataMaxConcurrencyFlag.Name),
BlobCacheSize: ctx.Uint64(flags.BlobCacheSizeFlag.Name),
BlobCacheBytes: ctx.Uint64(flags.BlobCacheBytes.Name),
BlobMaxConcurrency: ctx.Int(flags.BlobMaxConcurrencyFlag.Name),
ChunkCacheSize: ctx.Uint64(flags.ChunkCacheSizeFlag.Name),
ChunkMaxConcurrency: ctx.Int(flags.ChunkMaxConcurrencyFlag.Name),
Expand Down
6 changes: 3 additions & 3 deletions relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "METADATA_MAX_CONCURRENCY"),
Value: 32,
}
BlobCacheSizeFlag = cli.Uint64Flag{
Name: common.PrefixFlag(FlagPrefix, "blob-cache-size"),
BlobCacheBytes = cli.Uint64Flag{
Name: common.PrefixFlag(FlagPrefix, "blob-cache-bytes"),
Usage: "The size of the blob cache, in bytes.",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLOB_CACHE_SIZE"),
Expand Down Expand Up @@ -297,7 +297,7 @@ var optionalFlags = []cli.Flag{
MaxGRPCMessageSizeFlag,
MetadataCacheSizeFlag,
MetadataMaxConcurrencyFlag,
BlobCacheSizeFlag,
BlobCacheBytes,
BlobMaxConcurrencyFlag,
ChunkCacheSizeFlag,
ChunkMaxConcurrencyFlag,
Expand Down
5 changes: 4 additions & 1 deletion relay/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func newMetadataProvider(
}
server.blobParamsMap.Store(blobParamsMap)

c := cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize))
c := cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize),
func(key v2.BlobKey, value *blobMetadata) uint64 {
return uint64(1)
})

metadataCache, err := cache.NewCacheAccessor[v2.BlobKey, *blobMetadata](
c,
Expand Down
6 changes: 3 additions & 3 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type Config struct {
// goroutines.
MetadataMaxConcurrency int

// BlobCacheSize is the maximum size of the blob cache, in bytes.
BlobCacheSize uint64
// BlobCacheBytes is the maximum size of the blob cache, in bytes.
BlobCacheBytes uint64

// BlobMaxConcurrency puts a limit on the maximum number of concurrent blob fetches actively running on goroutines.
BlobMaxConcurrency int
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewServer(
ctx,
logger,
blobStore,
config.BlobCacheSize,
config.BlobCacheBytes,
config.BlobMaxConcurrency,
config.Timeouts.InternalGetBlobTimeout)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions relay/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func defaultConfig() *Config {
MaxGRPCMessageSize: 1024 * 1024 * 300,
MetadataCacheSize: 1024 * 1024,
MetadataMaxConcurrency: 32,
BlobCacheSize: 32,
BlobCacheBytes: 1024 * 1024,
BlobMaxConcurrency: 32,
ChunkCacheSize: 32,
ChunkCacheSize: 1024 * 1024,
ChunkMaxConcurrency: 32,
MaxKeysPerGetChunksRequest: 1024,
RateLimits: limiter.Config{
Expand Down

0 comments on commit e925517

Please sign in to comment.