From b74427d07ffc248876c6b0777070afb4c909722c Mon Sep 17 00:00:00 2001 From: Bailhache Pierre Date: Thu, 8 Aug 2024 14:57:43 +0200 Subject: [PATCH] feat: Add support of multiple kind of cache for relabeling components --- go.sum | 1 + internal/component/faro/receiver/server.go | 1 - .../component/prometheus/relabel/relabel.go | 87 +++++++---- .../prometheus/relabel/relabel_test.go | 50 +++--- .../prometheusconvert/component/relabel.go | 8 +- internal/service/cache/cache.go | 118 ++++++++++++++ internal/service/cache/cache_inmemory.go | 105 +++++++++++++ internal/service/cache/cache_memcached.go | 145 +++++++++++++++++ internal/service/cache/cache_redis.go | 147 ++++++++++++++++++ 9 files changed, 605 insertions(+), 57 deletions(-) create mode 100644 internal/service/cache/cache.go create mode 100644 internal/service/cache/cache_inmemory.go create mode 100644 internal/service/cache/cache_memcached.go create mode 100644 internal/service/cache/cache_redis.go diff --git a/go.sum b/go.sum index 4140e2242d..2a61584d20 100644 --- a/go.sum +++ b/go.sum @@ -434,6 +434,7 @@ github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4 github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= +github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo= github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc= diff --git a/internal/component/faro/receiver/server.go b/internal/component/faro/receiver/server.go index 13b2c0d140..1cc9fdeb1f 100644 --- a/internal/component/faro/receiver/server.go +++ b/internal/component/faro/receiver/server.go @@ -80,7 +80,6 @@ func (s *server) Run(ctx context.Context) error { }) mw := middleware.Instrument{ - RouteMatcher: r, Duration: s.metrics.requestDuration, RequestBodySize: s.metrics.rxMessageSize, ResponseBodySize: s.metrics.txMessageSize, diff --git a/internal/component/prometheus/relabel/relabel.go b/internal/component/prometheus/relabel/relabel.go index 69c579190a..a49d071b8b 100644 --- a/internal/component/prometheus/relabel/relabel.go +++ b/internal/component/prometheus/relabel/relabel.go @@ -9,9 +9,9 @@ import ( alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/prometheus" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/cache" "github.com/grafana/alloy/internal/service/labelstore" "github.com/grafana/alloy/internal/service/livedebugging" - lru "github.com/hashicorp/golang-lru/v2" prometheus_client "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -20,11 +20,19 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" ) const name = "prometheus.relabel" +// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have +// to be recalculated again. +type labelAndID struct { + Labels labels.Labels `json:"labels"` + ID uint64 `json:"id"` +} + func init() { component.Register(component.Registration{ Name: name, @@ -47,22 +55,38 @@ type Arguments struct { // The relabelling rules to apply to each metric before it's forwarded. MetricRelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"` - // Cache size to use for LRU cache. - CacheSize int `alloy:"max_cache_size,attr,optional"` + // DEPRECATED Use type = inmemory and cache_size field. + InMemoryCacheSizeDeprecated int `alloy:"max_cache_size,attr,optional"` + + // Cache backend configuration. + CacheConfig cache.CacheConfig `alloy:"cache,block,optional"` } // SetToDefault implements syntax.Defaulter. func (arg *Arguments) SetToDefault() { *arg = Arguments{ - CacheSize: 100_000, + CacheConfig: cache.CacheConfig{ + Backend: cache.InMemory, + InMemory: cache.InMemoryCacheConfig{ + CacheSize: 100_000, + }, + }, } } // Validate implements syntax.Validator. func (arg *Arguments) Validate() error { - if arg.CacheSize <= 0 { - return fmt.Errorf("max_cache_size must be greater than 0 and is %d", arg.CacheSize) + switch arg.CacheConfig.Backend { + case cache.InMemory: + if arg.CacheConfig.InMemory.CacheSize <= 0 { + return fmt.Errorf("cache_size must be greater than 0 and is %d", arg.CacheConfig.InMemory.CacheSize) + } + case cache.Memcached: + case cache.Redis: + default: + return fmt.Errorf("unknown cache backend, should be one of %s", cache.SupportedCaches) } + return nil } @@ -91,7 +115,7 @@ type Component struct { debugDataPublisher livedebugging.DebugDataPublisher cacheMut sync.RWMutex - cache *lru.Cache[uint64, *labelAndID] + cache cache.Cache[labelAndID] } var ( @@ -101,7 +125,13 @@ var ( // New creates a new prometheus.relabel component. func New(o component.Options, args Arguments) (*Component, error) { - cache, err := lru.New[uint64, *labelAndID](args.CacheSize) + // to be removed after deprecation of max cache size + if args.CacheConfig.Backend == "" && args.InMemoryCacheSizeDeprecated != 0 { + args.CacheConfig.Backend = cache.InMemory + args.CacheConfig.InMemory.CacheSize = args.InMemoryCacheSizeDeprecated + } + + relabelCache, err := cache.NewCache[labelAndID](args.CacheConfig) if err != nil { return nil, err } @@ -117,7 +147,7 @@ func New(o component.Options, args Arguments) (*Component, error) { } c := &Component{ opts: o, - cache: cache, + cache: relabelCache, ls: data.(labelstore.LabelStore), debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } @@ -230,7 +260,11 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() newArgs := args.(Arguments) - c.clearCache(newArgs.CacheSize) + + // in case of in_memory cache we need to clean the cache + if newArgs.CacheConfig.Backend == cache.InMemory { + c.clearCache(newArgs.CacheConfig.InMemory.CacheSize) + } c.mrc = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) c.fanout.UpdateChildren(newArgs.ForwardTo) @@ -253,7 +287,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { c.cacheHits.Inc() // If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels if newLbls != nil { - relabelled = newLbls.labels + relabelled = newLbls.Labels } } else { // Relabel against a copy of the labels to prevent modifying the original @@ -271,7 +305,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { } // Set the cache size to the cache.len // TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance. - c.cacheSize.Set(float64(c.cache.Len())) + // c.cacheSize.Set(float64(c.cache.GetCacheSize())) componentID := livedebugging.ComponentID(c.opts.ID) if c.debugDataPublisher.IsActive(componentID) { @@ -285,22 +319,23 @@ func (c *Component) getFromCache(id uint64) (*labelAndID, bool) { c.cacheMut.RLock() defer c.cacheMut.RUnlock() - fm, found := c.cache.Get(id) - return fm, found + value, err := c.cache.Get(fmt.Sprintf("%d", id)) + + return value, err == nil } func (c *Component) deleteFromCache(id uint64) { c.cacheMut.Lock() defer c.cacheMut.Unlock() c.cacheDeletes.Inc() - c.cache.Remove(id) + + c.cache.Remove(fmt.Sprintf("%d", id)) } func (c *Component) clearCache(cacheSize int) { c.cacheMut.Lock() defer c.cacheMut.Unlock() - cache, _ := lru.New[uint64, *labelAndID](cacheSize) - c.cache = cache + _ = c.cache.Clear(cacheSize) } func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) { @@ -308,21 +343,15 @@ func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) defer c.cacheMut.Unlock() if !keep { - c.cache.Add(originalID, nil) + _ = c.cache.Set(fmt.Sprintf("%d", originalID), nil, 0) return } newGlobal := c.ls.GetOrAddGlobalRefID(lbls) - c.cache.Add(originalID, &labelAndID{ - labels: lbls, - id: newGlobal, - }) + + _ = c.cache.Set(fmt.Sprintf("%d", originalID), &labelAndID{ + Labels: lbls, + ID: newGlobal, + }, 0) } func (c *Component) LiveDebugging(_ int) {} - -// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have -// to be recalculated again. -type labelAndID struct { - labels labels.Labels - id uint64 -} diff --git a/internal/component/prometheus/relabel/relabel_test.go b/internal/component/prometheus/relabel/relabel_test.go index 190a881542..ef0e9d84ce 100644 --- a/internal/component/prometheus/relabel/relabel_test.go +++ b/internal/component/prometheus/relabel/relabel_test.go @@ -13,6 +13,7 @@ import ( alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/prometheus" "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/service/cache" "github.com/grafana/alloy/internal/service/labelstore" "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util" @@ -25,39 +26,35 @@ import ( "github.com/stretchr/testify/require" ) -func TestCache(t *testing.T) { +func TestLRUCache(t *testing.T) { lc := labelstore.New(nil, prom.DefaultRegisterer) - relabeller := generateRelabel(t) + relabeller := generateRelabelWithLRUCache(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(0, lbls) - require.True(t, relabeller.cache.Len() == 1) + require.True(t, relabeller.cache.GetCacheSize() == 1) entry, found := relabeller.getFromCache(lc.GetOrAddGlobalRefID(lbls)) require.True(t, found) require.NotNil(t, entry) require.True( t, - lc.GetOrAddGlobalRefID(entry.labels) != lc.GetOrAddGlobalRefID(lbls), + lc.GetOrAddGlobalRefID(entry.Labels) != lc.GetOrAddGlobalRefID(lbls), ) } -func TestUpdateReset(t *testing.T) { - relabeller := generateRelabel(t) - lbls := labels.FromStrings("__address__", "localhost") - relabeller.relabel(0, lbls) - require.True(t, relabeller.cache.Len() == 1) - _ = relabeller.Update(Arguments{ - CacheSize: 100000, - MetricRelabelConfigs: []*alloy_relabel.Config{}, - }) - require.True(t, relabeller.cache.Len() == 0) -} - func TestValidator(t *testing.T) { - args := Arguments{CacheSize: 0} + args := Arguments{ + CacheConfig: cache.CacheConfig{ + Backend: "unknown", + }, + } err := args.Validate() require.Error(t, err) - args.CacheSize = 1 + args.CacheConfig.Backend = cache.InMemory + err = args.Validate() + require.Error(t, err) + + args.CacheConfig.InMemory.CacheSize = 1 err = args.Validate() require.NoError(t, err) } @@ -83,7 +80,7 @@ func TestNil(t *testing.T) { Action: "drop", }, }, - CacheSize: 100000, + InMemoryCacheSizeDeprecated: 100000, }) require.NotNil(t, relabeller) require.NoError(t, err) @@ -93,22 +90,22 @@ func TestNil(t *testing.T) { } func TestLRU(t *testing.T) { - relabeller := generateRelabel(t) + relabeller := generateRelabelWithLRUCache(t) for i := 0; i < 600_000; i++ { lbls := labels.FromStrings("__address__", "localhost", "inc", strconv.Itoa(i)) relabeller.relabel(0, lbls) } - require.True(t, relabeller.cache.Len() == 100_000) + require.True(t, relabeller.cache.GetCacheSize() == 100_000) } func TestLRUNaN(t *testing.T) { - relabeller := generateRelabel(t) + relabeller := generateRelabelWithLRUCache(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(0, lbls) - require.True(t, relabeller.cache.Len() == 1) + require.True(t, relabeller.cache.GetCacheSize() == 1) relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls) - require.True(t, relabeller.cache.Len() == 0) + require.True(t, relabeller.cache.GetCacheSize() == 0) } func BenchmarkCache(b *testing.B) { @@ -147,7 +144,7 @@ func BenchmarkCache(b *testing.B) { app.Commit() } -func generateRelabel(t *testing.T) *Component { +func generateRelabelWithLRUCache(t *testing.T) *Component { ls := labelstore.New(nil, prom.DefaultRegisterer) fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, l.Has("new_label")) @@ -170,8 +167,9 @@ func generateRelabel(t *testing.T) *Component { Action: "replace", }, }, - CacheSize: 100_000, + InMemoryCacheSizeDeprecated: 100_000, }) + require.NotNil(t, relabeller) require.NoError(t, err) return relabeller diff --git a/internal/converter/internal/prometheusconvert/component/relabel.go b/internal/converter/internal/prometheusconvert/component/relabel.go index f5eaa71ed9..c52a703c1c 100644 --- a/internal/converter/internal/prometheusconvert/component/relabel.go +++ b/internal/converter/internal/prometheusconvert/component/relabel.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/alloy/internal/component/prometheus/relabel" "github.com/grafana/alloy/internal/converter/internal/common" "github.com/grafana/alloy/internal/converter/internal/prometheusconvert/build" + "github.com/grafana/alloy/internal/service/cache" prom_relabel "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" ) @@ -36,7 +37,12 @@ func toRelabelArguments(relabelConfigs []*prom_relabel.Config, forwardTo []stora return &relabel.Arguments{ ForwardTo: forwardTo, MetricRelabelConfigs: ToAlloyRelabelConfigs(relabelConfigs), - CacheSize: 100_000, + CacheConfig: cache.CacheConfig{ + Backend: cache.InMemory, + InMemory: cache.InMemoryCacheConfig{ + CacheSize: 100_000, + }, + }, } } diff --git a/internal/service/cache/cache.go b/internal/service/cache/cache.go new file mode 100644 index 0000000000..382d3f61c5 --- /dev/null +++ b/internal/service/cache/cache.go @@ -0,0 +1,118 @@ +package cache + +import ( + "time" + + "github.com/grafana/dskit/cache" + "github.com/grafana/dskit/flagext" + "github.com/pkg/errors" +) + +const ( + // InMemory is the value for the in-memory cache backend. + InMemory = "inmemory" + + // Memcached is the value for the Memcached cache backend. + Memcached = cache.BackendMemcached + + // Redis is the value for the Redis cache backend. + Redis = cache.BackendRedis + + // Default is the value for the default cache backend. + Default = InMemory +) + +var ( + SupportedCaches = []string{InMemory, Memcached, Redis} + + errUnsupportedCache = errors.New("unsupported cache backend") + errNotFound = errors.New("not found in cache") +) + +type CacheConfig struct { + Backend string `alloy:"backend,attr"` + Memcached MemcachedConfig `alloy:"memcached,block,optional"` + Redis RedisConf `alloy:"redis,block,optional"` + InMemory InMemoryCacheConfig `alloy:"inmemory,block,optional"` +} + +//TODO Those field are copied from dskit/cache for now (only the one mandatory) +// We need to have a better way to manage conf +// For now I used those because we cannot embed 'yaml' tags into alloy configuration +// Ideally we should be using the dskit/cache conf directly, but it means it should not +// be into the alloy configuration ? + +type RedisConf struct { + // Endpoint specifies the endpoint of Redis server. + Endpoint flagext.StringSliceCSV `alloy:"endpoint,attr"` + + // Use the specified Username to authenticate the current connection + // with one of the connections defined in the ACL list when connecting + // to a Redis 6.0 instance, or greater, that is using the Redis ACL system. + Username string `alloy:"username,attr"` + + // Optional password. Must match the password specified in the + // requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower), + // or the User Password when connecting to a Redis 6.0 instance, or greater, + // that is using the Redis ACL system. + Password string `alloy:"password,attr,optional"` + + // DB Database to be selected after connecting to the server. + DB int `alloy:"db,attr"` + + // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. + MaxAsyncConcurrency int `yaml:"max_async_concurrency" category:"advanced"` + + // MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations. + MaxAsyncBufferSize int `yaml:"max_async_buffer_size" category:"advanced"` +} + +type MemcachedConfig struct { + // Addresses specifies the list of memcached addresses. The addresses get + // resolved with the DNS provider. + Addresses flagext.StringSliceCSV `alloy:"addresses,attr"` + + // WriteBufferSizeBytes specifies the size of the write buffer (in bytes). The buffer + // is allocated for each connection. + WriteBufferSizeBytes int `alloy:"write_buffer_size_bytes,attr"` + + // ReadBufferSizeBytes specifies the size of the read buffer (in bytes). The buffer + // is allocated for each connection. + ReadBufferSizeBytes int `alloy:"read_buffer_size_bytes,attr"` + + // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. + MaxAsyncConcurrency int `yaml:"max_async_concurrency" category:"advanced"` + + // MaxAsyncBufferSize specifies the queue buffer size for SetAsync operations. + MaxAsyncBufferSize int `yaml:"max_async_buffer_size" category:"advanced"` +} + +type InMemoryCacheConfig struct { + CacheSize int `alloy:"cache_size,attr"` +} + +type Cache[valueType any] interface { + Get(key string) (*valueType, error) + GetMultiple(keys []string) (map[string]*valueType, error) + Set(key string, value *valueType, ttl time.Duration) error + SetMultiple(values map[string]*valueType, ttl time.Duration) error + Remove(key string) + Clear(newSize int) error + GetCacheSize() int +} + +// NewCache creates a new cache based on the given configuration +func NewCache[valueType any](cfg CacheConfig) (Cache[valueType], error) { + switch cfg.Backend { + case InMemory: + return NewInMemoryCacheWithConfig[valueType](InMemoryCacheConfig{ + CacheSize: cfg.InMemory.CacheSize, + }) + case Memcached: + return newMemcachedCache[valueType](cfg.Memcached) + case Redis: + return newRedisCache[valueType](cfg.Redis) + default: + return nil, errUnsupportedCache + } +} diff --git a/internal/service/cache/cache_inmemory.go b/internal/service/cache/cache_inmemory.go new file mode 100644 index 0000000000..67af1eecbf --- /dev/null +++ b/internal/service/cache/cache_inmemory.go @@ -0,0 +1,105 @@ +package cache + +import ( + "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2" +) + +type InMemoryCache[valueType any] struct { + lru *lru.Cache[string, *valueType] + cacheSize int + cacheMut sync.RWMutex +} + +// NewInMemoryCacheWithConfig creates a new thread-safe LRU cache for index entries and ensures the total cache +// size approximately does not exceed maxBytes. +func NewInMemoryCacheWithConfig[valueType any](config InMemoryCacheConfig) (*InMemoryCache[valueType], error) { + c := &InMemoryCache[valueType]{ + cacheSize: config.CacheSize, + } + + // Initialize LRU cache + cache, err := lru.New[string, *valueType](c.cacheSize) + if err != nil { + return nil, err + } + c.lru = cache + + return c, nil +} + +func (c *InMemoryCache[valueType]) Get(key string) (*valueType, error) { + c.cacheMut.RLock() + defer c.cacheMut.RUnlock() + + fm, found := c.lru.Get(key) + if !found { + return nil, errNotFound + } + + return fm, nil +} + +func (c *InMemoryCache[valueType]) GetMultiple(keys []string) (map[string]*valueType, error) { + c.cacheMut.RLock() + defer c.cacheMut.RUnlock() + + values := make(map[string]*valueType, len(keys)) + + for _, key := range keys { + found := false + values[key], found = c.lru.Get(key) + if !found { + return nil, errNotFound + } + } + + return values, nil +} + +func (c *InMemoryCache[valueType]) Remove(key string) { + c.cacheMut.Lock() + defer c.cacheMut.Unlock() + + c.lru.Remove(key) +} + +func (c *InMemoryCache[valueType]) Set(key string, value *valueType, ttl time.Duration) error { + c.cacheMut.Lock() + defer c.cacheMut.Unlock() + + c.lru.Add(key, value) + + return nil +} + +func (c *InMemoryCache[valueType]) SetMultiple(values map[string]*valueType, ttl time.Duration) error { + c.cacheMut.Lock() + defer c.cacheMut.Unlock() + + for key, value := range values { + c.lru.Add(key, value) + } + + return nil +} + +func (c *InMemoryCache[valueType]) Clear(newSize int) error { + c.cacheMut.Lock() + defer c.cacheMut.Unlock() + lru, err := lru.New[string, *valueType](newSize) + if err != nil { + return err + } + + c.lru = lru + return nil +} + +func (c *InMemoryCache[valueType]) GetCacheSize() int { + c.cacheMut.Lock() + defer c.cacheMut.Unlock() + return c.lru.Len() +} diff --git a/internal/service/cache/cache_memcached.go b/internal/service/cache/cache_memcached.go new file mode 100644 index 0000000000..e65c335ce5 --- /dev/null +++ b/internal/service/cache/cache_memcached.go @@ -0,0 +1,145 @@ +package cache + +import ( + "bytes" + "context" + "encoding/gob" + "io" + "os" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/cache" +) + +type MemcachedCache[valueType any] struct { + client *cache.MemcachedClient +} + +func newMemcachedCache[valueType any](cfg MemcachedConfig) (*MemcachedCache[valueType], error) { + client, err := cache.NewMemcachedClientWithConfig( + //TODO NewLogFmtLogger ? Maybe something else + log.NewLogfmtLogger(os.Stdout), + "memcached-cache", + cache.MemcachedClientConfig{ + Addresses: cfg.Addresses, + WriteBufferSizeBytes: cfg.WriteBufferSizeBytes, + ReadBufferSizeBytes: cfg.ReadBufferSizeBytes, + MaxAsyncConcurrency: cfg.MaxAsyncConcurrency, + MaxAsyncBufferSize: cfg.MaxAsyncBufferSize, + }, + //TODO add prometheus registerer here + nil, + ) + + if err != nil { + return nil, err + } + + return &MemcachedCache[valueType]{ + client: client, + }, nil +} + +func (c *MemcachedCache[valueType]) Get(key string) (*valueType, error) { + ctx := context.Background() + var out valueType + + data := c.client.GetMulti(ctx, []string{key}) + if data[key] == nil { + //TODO check if data == nil means only not found ? + // what happens when network errors ? + return nil, errNotFound + } + + decoder := gob.NewDecoder(bytes.NewReader(data[key])) + if err := decoder.Decode(&out); err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + } + + return &out, nil +} + +func (c *MemcachedCache[valueType]) GetMultiple(keys []string) (map[string]*valueType, error) { + ctx := context.Background() + + data := c.client.GetMulti(ctx, keys) + if data == nil { + //TODO check if data == nil means only not found ? + // what happens when network errors ? + return nil, errNotFound + } + + result := make(map[string]*valueType, len(keys)) + + for key, rawValue := range data { + decoder := gob.NewDecoder(bytes.NewReader(rawValue)) + if err := decoder.Decode(result[key]); err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + } + } + + return result, nil +} + +func (c *MemcachedCache[valueType]) Remove(key string) { + ctx := context.Background() + //TODO manage error + _ = c.client.Delete(ctx, key) + +} + +func (c *MemcachedCache[valueType]) Set(key string, value *valueType, ttl time.Duration) error { + if value == nil { + c.client.SetAsync(key, nil, ttl) + return nil + } + + var indexBuffer bytes.Buffer + + encoder := gob.NewEncoder(&indexBuffer) + if err := encoder.Encode(*value); err != nil { + return err + } + c.client.SetAsync(key, indexBuffer.Bytes(), ttl) + return nil +} + +func (c *MemcachedCache[valueType]) SetMultiple(values map[string]*valueType, ttl time.Duration) error { + var ( + firstErr error + failed int + ) + + for key, value := range values { + var indexBuffer bytes.Buffer + encoder := gob.NewEncoder(&indexBuffer) + + if err := encoder.Encode(*value); err != nil { + return err + } + + if err := c.client.SetAsync(key, indexBuffer.Bytes(), ttl); err != nil { + failed++ + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} + +func (c *MemcachedCache[valueType]) Clear(newSize int) error { + // do nothing here + return nil +} + +func (c *MemcachedCache[valueType]) GetCacheSize() int { + // do nothing here + return 0 +} diff --git a/internal/service/cache/cache_redis.go b/internal/service/cache/cache_redis.go new file mode 100644 index 0000000000..276bfa97bc --- /dev/null +++ b/internal/service/cache/cache_redis.go @@ -0,0 +1,147 @@ +package cache + +import ( + "bytes" + "context" + "encoding/gob" + "io" + "os" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/cache" + "github.com/grafana/dskit/flagext" +) + +type RedisCache[valueType any] struct { + client *cache.RedisClient +} + +func newRedisCache[valueType any](cfg RedisConf) (*RedisCache[valueType], error) { + client, err := cache.NewRedisClient( + //TODO NewLogFmtLogger ? Maybe something else + log.NewLogfmtLogger(os.Stdout), + "redis-cache", + cache.RedisClientConfig{ + Endpoint: []string{cfg.Endpoint.String()}, + Username: "default", + Password: flagext.SecretWithValue(""), + MaxAsyncConcurrency: cfg.MaxAsyncConcurrency, + MaxAsyncBufferSize: cfg.MaxAsyncBufferSize, + DB: cfg.DB, + }, + //TODO add prometheus registerer here + nil, + ) + + if err != nil { + return nil, err + } + + return &RedisCache[valueType]{ + client: client, + }, nil +} + +func (c *RedisCache[valueType]) Get(key string) (*valueType, error) { + ctx := context.Background() + var out valueType + + data := c.client.GetMulti(ctx, []string{key}) + if data[key] == nil { + //TODO check if data == nil means only not found ? + // what happens when network errors ? + return nil, errNotFound + } + + decoder := gob.NewDecoder(bytes.NewReader(data[key])) + if err := decoder.Decode(&out); err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + } + + return &out, nil +} + +func (c *RedisCache[valueType]) GetMultiple(keys []string) (map[string]*valueType, error) { + ctx := context.Background() + + data := c.client.GetMulti(ctx, keys) + if data == nil { + //TODO check if data == nil means only not found ? + // what happens when network errors ? + return nil, errNotFound + } + + result := make(map[string]*valueType, len(keys)) + + for key, rawValue := range data { + decoder := gob.NewDecoder(bytes.NewReader(rawValue)) + if err := decoder.Decode(result[key]); err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + } + } + + return result, nil +} + +func (c *RedisCache[valueType]) Remove(key string) { + ctx := context.Background() + //TODO manage error + _ = c.client.Delete(ctx, key) + +} + +func (c *RedisCache[valueType]) Set(key string, value *valueType, ttl time.Duration) error { + if value == nil { + c.client.SetAsync(key, nil, ttl) + return nil + } + + var indexBuffer bytes.Buffer + + encoder := gob.NewEncoder(&indexBuffer) + if err := encoder.Encode(*value); err != nil { + return err + } + c.client.SetAsync(key, indexBuffer.Bytes(), ttl) + return nil +} + +func (c *RedisCache[valueType]) SetMultiple(values map[string]*valueType, ttl time.Duration) error { + var ( + firstErr error + failed int + ) + + for key, value := range values { + var indexBuffer bytes.Buffer + encoder := gob.NewEncoder(&indexBuffer) + + if err := encoder.Encode(*value); err != nil { + return err + } + + if err := c.client.SetAsync(key, indexBuffer.Bytes(), ttl); err != nil { + failed++ + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} + +func (c *RedisCache[valueType]) Clear(newSize int) error { + // do nothing here + return nil +} + +func (c *RedisCache[valueType]) GetCacheSize() int { + // do nothing here + return 0 +}