Skip to content

Commit

Permalink
moving matcher cache to storecache package
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed Dec 9, 2024
1 parent 3a10cf1 commit 07a5c89
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 53 deletions.
6 changes: 3 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -226,9 +226,9 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storepb.NewNoopMatcherCache()
var cache = storecache.NewNoopMatcherCache()
if conf.matcherCacheSize > 0 {
cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg))
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil/custom"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)
cache, err := storepb.NewMatchersCache()
cache, err := storecache.NewMatchersCache()
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
Expand Down
7 changes: 4 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/thanos-io/thanos/pkg/receive/expandedpostingscache"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand All @@ -64,7 +65,7 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

matcherCache storepb.MatchersCache
matcherCache storecache.MatchersCache

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB
Expand Down Expand Up @@ -97,7 +98,7 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption {
}
}

func WithMatchersCache(cache storepb.MatchersCache) MultiTSDBOption {
func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption {
return func(s *MultiTSDB) {
s.matcherCache = cache
}
Expand Down Expand Up @@ -135,7 +136,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storepb.NewNoopMatcherCache(),
matcherCache: storecache.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storepb_test
package storecache_test

import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

func TestMatchersCache(t *testing.T) {
cache, err := storepb.NewMatchersCache(storepb.WithSize(2))
cache, err := storecache.NewMatchersCache(storecache.WithSize(2))
testutil.Ok(t, err)

matcher := storepb.LabelMatcher{
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestMatchersCache(t *testing.T) {
}

func BenchmarkMatchersCache(b *testing.B) {
cache, err := storepb.NewMatchersCache(storepb.WithSize(100))
cache, err := storecache.NewMatchersCache(storecache.WithSize(100))
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storepb
package storecache

import (
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/singleflight"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

const DefaultCacheSize = 200

type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error)
type NewItemFunc func(matcher storepb.LabelMatcher) (*labels.Matcher, error)

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
Expand All @@ -36,14 +35,14 @@ func NewNoopMatcherCache() MatchersCache {
}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (n *NoopMatcherCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem(key)
}

// LruMatchersCache implements MatchersCache with an LRU cache and metrics.
type LruMatchersCache struct {
reg prometheus.Registerer
cache *lru.Cache[LabelMatcher, *labels.Matcher]
cache *lru.Cache[storepb.LabelMatcher, *labels.Matcher]
metrics *matcherCacheMetrics
size int
sf singleflight.Group
Expand Down Expand Up @@ -74,7 +73,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
}
cache.metrics = newMatcherCacheMetrics(cache.reg)

lruCache, err := lru.NewWithEvict[LabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
lruCache, err := lru.NewWithEvict[storepb.LabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
return nil, err
}
Expand All @@ -83,7 +82,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
c.metrics.requestsTotal.Inc()
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
Expand Down Expand Up @@ -111,7 +110,7 @@ func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*lab
return v.(*labels.Matcher), nil
}

func (c *LruMatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) {
func (c *LruMatchersCache) onEvict(_ storepb.LabelMatcher, _ *labels.Matcher) {
c.metrics.evicted.Inc()
c.metrics.numItems.Set(float64(c.cache.Len()))
}
Expand Down Expand Up @@ -148,3 +147,18 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
}),
}
}

// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers.
// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions.
// NOTE: It (can) allocate memory.
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, storepb.MatcherToPromMatcher)
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}
3 changes: 2 additions & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -130,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storepb.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/store/matcher_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package store
11 changes: 6 additions & 5 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -488,13 +489,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storepb.MatchersCache) (bool, []*labels.Matcher, error) {
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storecache.MatchersCache) (bool, []*labels.Matcher, error) {
var (
tms []*labels.Matcher
err error
)

tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
tms, err = storecache.MatchersToPromMatchersCached(cache, ms...)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -542,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -605,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
Expand Down Expand Up @@ -89,7 +90,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
matcherCache storepb.MatchersCache
matcherCache storecache.MatchersCache
enableDedup bool
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func WithoutDedup() ProxyStoreOption {
}

// WithMatcherCache sets the matcher cache instance for the proxy.
func WithMatcherCache(cache storepb.MatchersCache) ProxyStoreOption {
func WithMatcherCache(cache storecache.MatchersCache) ProxyStoreOption {
return func(s *ProxyStore) {
s.matcherCache = cache
}
Expand Down Expand Up @@ -176,7 +177,7 @@ func NewProxyStore(
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
enableDedup: true,
matcherCache: storepb.NewNoopMatcherCache(),
matcherCache: storecache.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
Expand Down Expand Up @@ -2086,7 +2087,7 @@ func BenchmarkProxySeries(b *testing.B) {
func BenchmarkProxySeriesRegex(b *testing.B) {
tb := testutil.NewTB(b)

cache, err := storepb.NewMatchersCache(storepb.WithSize(200))
cache, err := storecache.NewMatchersCache(storecache.WithSize(200))
testutil.Ok(b, err)

q := NewProxyStore(nil,
Expand Down Expand Up @@ -2174,7 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
responseTimeout: 5 * time.Second,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
matcherCache: storecache.NewNoopMatcherCache(),
}

var allResps []*storepb.SeriesResponse
Expand Down Expand Up @@ -2311,7 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
matcherCache: storecache.NewNoopMatcherCache(),
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -2349,7 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
matcherCache: storecache.NewNoopMatcherCache(),
}

ctx := context.Background()
Expand Down
15 changes: 0 additions & 15 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,21 +395,6 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
return res, nil
}

// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers.
// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions.
// NOTE: It (can) allocate memory.
func MatchersToPromMatchersCached(cache MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, MatcherToPromMatcher)
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher.
func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
var t labels.MatchType
Expand Down
Loading

0 comments on commit 07a5c89

Please sign in to comment.