From 07a5c89975553e671c5e4543ac3fd4a2a64a0610 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Mon, 9 Dec 2024 15:52:48 +0100 Subject: [PATCH] moving matcher cache to storecache package Signed-off-by: Pedro Tanaka --- cmd/thanos/receive.go | 6 +-- pkg/query/query_test.go | 3 +- pkg/receive/multitsdb.go | 7 ++-- .../{storepb => cache}/matcher_cache_test.go | 7 ++-- .../matchers_cache.go} | 38 +++++++++++++------ pkg/store/local.go | 3 +- pkg/store/matcher_cache.go | 1 + pkg/store/prometheus.go | 11 +++--- pkg/store/proxy.go | 7 ++-- pkg/store/proxy_test.go | 9 +++-- pkg/store/storepb/custom.go | 15 -------- pkg/store/tsdb.go | 7 ++-- 12 files changed, 61 insertions(+), 53 deletions(-) rename pkg/store/{storepb => cache}/matcher_cache_test.go (93%) rename pkg/store/{storepb/matcher_cache.go => cache/matchers_cache.go} (73%) create mode 100644 pkg/store/matcher_cache.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 57c62b19df..18e56f9413 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -50,8 +50,8 @@ import ( grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" ) @@ -226,9 +226,9 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } - var cache = storepb.NewNoopMatcherCache() + var cache = storecache.NewNoopMatcherCache() if conf.matcherCacheSize > 0 { - cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg)) + cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) if err != nil { return errors.Wrap(err, "failed to create matchers cache") } diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 749e8fc0f2..a0ec36659b 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -17,6 +17,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil/custom" @@ -55,7 +56,7 @@ func TestQuerier_Proxy(t *testing.T) { files, err := filepath.Glob("testdata/promql/**/*.test") testutil.Ok(t, err) testutil.Equals(t, 10, len(files), "%v", files) - cache, err := storepb.NewMatchersCache() + cache, err := storecache.NewMatchersCache() testutil.Ok(t, err) logger := log.NewLogfmtLogger(os.Stderr) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 6ec4df672e..ee7d95572d 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -39,6 +39,7 @@ import ( "github.com/thanos-io/thanos/pkg/receive/expandedpostingscache" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -64,7 +65,7 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - matcherCache storepb.MatchersCache + matcherCache storecache.MatchersCache tsdbClients []store.Client exemplarClients map[string]*exemplars.TSDB @@ -97,7 +98,7 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { } } -func WithMatchersCache(cache storepb.MatchersCache) MultiTSDBOption { +func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption { return func(s *MultiTSDB) { s.matcherCache = cache } @@ -135,7 +136,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { diff --git a/pkg/store/storepb/matcher_cache_test.go b/pkg/store/cache/matcher_cache_test.go similarity index 93% rename from pkg/store/storepb/matcher_cache_test.go rename to pkg/store/cache/matcher_cache_test.go index c76b942ae5..c99d4c9cd8 100644 --- a/pkg/store/storepb/matcher_cache_test.go +++ b/pkg/store/cache/matcher_cache_test.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package storepb_test +package storecache_test import ( "testing" @@ -9,11 +9,12 @@ import ( "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" ) func TestMatchersCache(t *testing.T) { - cache, err := storepb.NewMatchersCache(storepb.WithSize(2)) + cache, err := storecache.NewMatchersCache(storecache.WithSize(2)) testutil.Ok(t, err) matcher := storepb.LabelMatcher{ @@ -86,7 +87,7 @@ func TestMatchersCache(t *testing.T) { } func BenchmarkMatchersCache(b *testing.B) { - cache, err := storepb.NewMatchersCache(storepb.WithSize(100)) + cache, err := storecache.NewMatchersCache(storecache.WithSize(100)) if err != nil { b.Fatalf("failed to create cache: %v", err) } diff --git a/pkg/store/storepb/matcher_cache.go b/pkg/store/cache/matchers_cache.go similarity index 73% rename from pkg/store/storepb/matcher_cache.go rename to pkg/store/cache/matchers_cache.go index 289a314eab..11f44c6bc2 100644 --- a/pkg/store/storepb/matcher_cache.go +++ b/pkg/store/cache/matchers_cache.go @@ -1,24 +1,23 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package storepb +package storecache import ( - lru "github.com/hashicorp/golang-lru/v2" + "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/singleflight" + + "github.com/thanos-io/thanos/pkg/store/storepb" ) const DefaultCacheSize = 200 -type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error) +type NewItemFunc func(matcher storepb.LabelMatcher) (*labels.Matcher, error) type MatchersCache interface { // GetOrSet retrieves a matcher from cache or creates and stores it if not present. // If the matcher is not in cache, it uses the provided newItem function to create it. - GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) + GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) } // Ensure implementations satisfy the interface. @@ -36,14 +35,14 @@ func NewNoopMatcherCache() MatchersCache { } // GetOrSet implements MatchersCache by always creating a new matcher without caching. -func (n *NoopMatcherCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { +func (n *NoopMatcherCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { return newItem(key) } // LruMatchersCache implements MatchersCache with an LRU cache and metrics. type LruMatchersCache struct { reg prometheus.Registerer - cache *lru.Cache[LabelMatcher, *labels.Matcher] + cache *lru.Cache[storepb.LabelMatcher, *labels.Matcher] metrics *matcherCacheMetrics size int sf singleflight.Group @@ -74,7 +73,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { } cache.metrics = newMatcherCacheMetrics(cache.reg) - lruCache, err := lru.NewWithEvict[LabelMatcher, *labels.Matcher](cache.size, cache.onEvict) + lruCache, err := lru.NewWithEvict[storepb.LabelMatcher, *labels.Matcher](cache.size, cache.onEvict) if err != nil { return nil, err } @@ -83,7 +82,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { return cache, nil } -func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { +func (c *LruMatchersCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { c.metrics.requestsTotal.Inc() if item, ok := c.cache.Get(key); ok { c.metrics.hitsTotal.Inc() @@ -111,7 +110,7 @@ func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*lab return v.(*labels.Matcher), nil } -func (c *LruMatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) { +func (c *LruMatchersCache) onEvict(_ storepb.LabelMatcher, _ *labels.Matcher) { c.metrics.evicted.Inc() c.metrics.numItems.Set(float64(c.cache.Len())) } @@ -148,3 +147,18 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { }), } } + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It (can) allocate memory. +func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for _, m := range ms { + pm, err := cache.GetOrSet(m, storepb.MatcherToPromMatcher) + if err != nil { + return nil, err + } + res = append(res, pm) + } + return res, nil +} diff --git a/pkg/store/local.go b/pkg/store/local.go index 4d63d2d64a..bcd22c3fe8 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -130,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storepb.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/matcher_cache.go b/pkg/store/matcher_cache.go new file mode 100644 index 0000000000..72440ea2a6 --- /dev/null +++ b/pkg/store/matcher_cache.go @@ -0,0 +1 @@ +package store diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index ba326b4e98..8ba60a00fe 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -36,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -125,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -488,13 +489,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storepb.MatchersCache) (bool, []*labels.Matcher, error) { +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storecache.MatchersCache) (bool, []*labels.Matcher, error) { var ( tms []*labels.Matcher err error ) - tms, err = storepb.MatchersToPromMatchersCached(cache, ms...) + tms, err = storecache.MatchersToPromMatchersCached(cache, ms...) if err != nil { return false, nil, err } @@ -542,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -605,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 6d414f6922..b03703da8e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -89,7 +90,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector - matcherCache storepb.MatchersCache + matcherCache storecache.MatchersCache enableDedup bool } @@ -139,7 +140,7 @@ func WithoutDedup() ProxyStoreOption { } // WithMatcherCache sets the matcher cache instance for the proxy. -func WithMatcherCache(cache storepb.MatchersCache) ProxyStoreOption { +func WithMatcherCache(cache storecache.MatchersCache) ProxyStoreOption { return func(s *ProxyStore) { s.matcherCache = cache } @@ -176,7 +177,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 7d51972977..cd11ead83e 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -31,6 +31,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" @@ -2086,7 +2087,7 @@ func BenchmarkProxySeries(b *testing.B) { func BenchmarkProxySeriesRegex(b *testing.B) { tb := testutil.NewTB(b) - cache, err := storepb.NewMatchersCache(storepb.WithSize(200)) + cache, err := storecache.NewMatchersCache(storecache.WithSize(200)) testutil.Ok(b, err) q := NewProxyStore(nil, @@ -2174,7 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { responseTimeout: 5 * time.Second, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } var allResps []*storepb.SeriesResponse @@ -2311,7 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } ctx, cancel := context.WithCancel(context.Background()) @@ -2349,7 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } ctx := context.Background() diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index f0d7b9dd60..46ce34ed57 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -395,21 +395,6 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { return res, nil } -// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. -// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. -// NOTE: It (can) allocate memory. -func MatchersToPromMatchersCached(cache MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) { - res := make([]*labels.Matcher, 0, len(ms)) - for _, m := range ms { - pm, err := cache.GetOrSet(m, MatcherToPromMatcher) - if err != nil { - return nil, err - } - res = append(res, pm) - } - return res, nil -} - // MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { var t labels.MatchType diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index f0b13615bb..78c9bb6207 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -53,7 +54,7 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption { } } -func WithMatcherCacheInstance(cache storepb.MatchersCache) TSDBStoreOption { +func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { return func(s *TSDBStore) { s.matcherCache = cache } @@ -68,7 +69,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int - matcherCache storepb.MatchersCache + matcherCache storecache.MatchersCache extLset labels.Labels startStoreFilterUpdate bool @@ -119,7 +120,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, - matcherCache: storepb.NewNoopMatcherCache(), + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options {