Skip to content

Commit

Permalink
Receiver: cache matchers for series calls
Browse files Browse the repository at this point in the history
We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache.

We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here:

* Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache.
* The main change is in `matchesExternalLabels` function which now receives a cache instance.

adding matcher cache and refactor matchers

Co-authored-by: Andre Branchizio <[email protected]>

Signed-off-by: Pedro Tanaka <[email protected]>

Using the cache in proxy and tsdb stores (only receiver)

Signed-off-by: Pedro Tanaka <[email protected]>

fixing problem with deep equality

Signed-off-by: Pedro Tanaka <[email protected]>

adding some docs

Signed-off-by: Pedro Tanaka <[email protected]>

Adding benchmark

Signed-off-by: Pedro Tanaka <[email protected]>

undo unecessary changes

Signed-off-by: Pedro Tanaka <[email protected]>

Adjusting metric names

Signed-off-by: Pedro Tanaka <[email protected]>

adding changelog

Signed-off-by: Pedro Tanaka <[email protected]>

wiring changes to the receiver

Signed-off-by: Pedro Tanaka <[email protected]>

Fixing linting

Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed May 13, 2024
1 parent 2d738f0 commit 3f852a5
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 62 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed

- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receive: cache matchers for series calls.

### Removed

## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024
Expand Down
15 changes: 15 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -204,6 +205,14 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache *storepb.MatchersCache
if conf.matcherCacheSize > 0 {
cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "create matchers cache")
}
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand All @@ -214,6 +223,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
cache,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
Expand Down Expand Up @@ -322,6 +332,7 @@ func runReceive(

options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithMatcherCache(cache),
}

proxy := store.NewProxyStore(
Expand Down Expand Up @@ -838,6 +849,8 @@ type receiveConfig struct {
limitsConfigReloadTimer time.Duration

asyncForwardWorkerCount uint

matcherCacheSize int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -973,6 +986,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
Expand Down
5 changes: 4 additions & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -55,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)
cache, err := storepb.NewMatchersCache()
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
Expand All @@ -63,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) {
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() },
component.Debug, nil, 5*time.Minute, store.EagerRetrieval),
component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)),
1000000,
5*time.Minute,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, &WriterOptions{})
Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -59,6 +60,7 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig
cache *storepb.MatchersCache
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -73,6 +75,7 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
cache *storepb.MatchersCache,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
Expand All @@ -90,6 +93,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
cache: cache,
}
}

Expand Down Expand Up @@ -654,7 +658,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, store.WithMatcherCacheInstance(t.cache)), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
50 changes: 22 additions & 28 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,14 @@ func TestMultiTSDB(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc, nil)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down Expand Up @@ -141,6 +134,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -172,19 +166,12 @@ func TestMultiTSDB(t *testing.T) {

t.Run("flush with one sample produces a block", func(t *testing.T) {
const testTenant = "test_tenant"
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc, nil)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down Expand Up @@ -451,6 +438,7 @@ func TestMultiTSDBPrune(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -520,6 +508,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -581,6 +570,7 @@ func TestAlignedHeadFlush(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -655,6 +645,7 @@ func TestMultiTSDBStats(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -684,6 +675,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -725,6 +717,7 @@ func TestProxyLabelValues(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -815,6 +808,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
nil,
false,
metadata.NoneFunc,
nil,
)
defer func() { testutil.Ok(b, m.Close()) }()

Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -810,6 +811,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
bucket,
false,
metadata.NoneFunc,
nil,
)

return m
Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) {
nil,
false,
metadata.NoneFunc,
nil,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })

Expand Down Expand Up @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
nil,
false,
metadata.NoneFunc,
nil,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels)
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -524,8 +524,16 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) {
tms, err := storepb.MatchersToPromMatchers(ms...)
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) {
var (
tms []*labels.Matcher
err error
)
if cache != nil {
tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
} else {
tms, err = storepb.MatchersToPromMatchers(ms...)
}
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -573,7 +581,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -636,7 +644,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
matcherCache *storepb.MatchersCache
}

type proxyStoreMetrics struct {
Expand All @@ -109,7 +110,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
}
}

// BucketStoreOption are functions that configure BucketStore.
// ProxyStoreOption are functions that configure the ProxyStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging toggles debug logging.
Expand All @@ -126,6 +127,13 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
}
}

// WithMatcherCache sets the matcher cache instance for the proxy.
func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption {
return func(s *ProxyStore) {
s.matcherCache = cache
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down Expand Up @@ -292,7 +300,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
reqLogger = log.With(reqLogger, "request", originalRequest.String())
}

match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
Loading

0 comments on commit 3f852a5

Please sign in to comment.