Skip to content

Commit

Permalink
Merge pull request prometheus#14328 from bboreham/more-dedupelabels
Browse files Browse the repository at this point in the history
TSDB and scraping: improvements to dedupelabels
  • Loading branch information
bboreham authored Jul 5, 2024
2 parents 709c5d6 + 4d7532f commit 89608c6
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 7 deletions.
10 changes: 10 additions & 0 deletions scrape/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type scrapeMetrics struct {
targetScrapePoolExceededTargetLimit prometheus.Counter
targetScrapePoolTargetLimit *prometheus.GaugeVec
targetScrapePoolTargetsAdded *prometheus.GaugeVec
targetScrapePoolSymbolTableItems *prometheus.GaugeVec
targetSyncIntervalLength *prometheus.SummaryVec
targetSyncFailed *prometheus.CounterVec

Expand Down Expand Up @@ -129,6 +130,13 @@ func newScrapeMetrics(reg prometheus.Registerer) (*scrapeMetrics, error) {
},
[]string{"scrape_job"},
)
sm.targetScrapePoolSymbolTableItems = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "prometheus_target_scrape_pool_symboltable_items",
Help: "Current number of symbols in table for this scrape pool.",
},
[]string{"scrape_job"},
)
sm.targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_sync_total",
Expand Down Expand Up @@ -234,6 +242,7 @@ func newScrapeMetrics(reg prometheus.Registerer) (*scrapeMetrics, error) {
sm.targetScrapePoolExceededTargetLimit,
sm.targetScrapePoolTargetLimit,
sm.targetScrapePoolTargetsAdded,
sm.targetScrapePoolSymbolTableItems,
sm.targetSyncFailed,
// Used by targetScraper.
sm.targetScrapeExceededBodySizeLimit,
Expand Down Expand Up @@ -274,6 +283,7 @@ func (sm *scrapeMetrics) Unregister() {
sm.reg.Unregister(sm.targetScrapePoolExceededTargetLimit)
sm.reg.Unregister(sm.targetScrapePoolTargetLimit)
sm.reg.Unregister(sm.targetScrapePoolTargetsAdded)
sm.reg.Unregister(sm.targetScrapePoolSymbolTableItems)
sm.reg.Unregister(sm.targetSyncFailed)
sm.reg.Unregister(sm.targetScrapeExceededBodySizeLimit)
sm.reg.Unregister(sm.targetScrapeCacheFlushForced)
Expand Down
24 changes: 17 additions & 7 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (sp *scrapePool) stop() {
sp.metrics.targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
sp.metrics.targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
sp.metrics.targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
sp.metrics.targetScrapePoolSymbolTableItems.DeleteLabelValues(sp.config.JobName)
sp.metrics.targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
sp.metrics.targetSyncFailed.DeleteLabelValues(sp.config.JobName)
}
Expand Down Expand Up @@ -273,6 +274,15 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {

sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))

sp.restartLoops(reuseCache)
oldClient.CloseIdleConnections()
sp.metrics.targetReloadIntervalLength.WithLabelValues(time.Duration(sp.config.ScrapeInterval).String()).Observe(
time.Since(start).Seconds(),
)
return nil
}

func (sp *scrapePool) restartLoops(reuseCache bool) {
var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
Expand Down Expand Up @@ -313,7 +323,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
client: sp.client,
timeout: timeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(cfg.ScrapeProtocols),
acceptHeader: acceptHeader(sp.config.ScrapeProtocols),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
}
newLoop = sp.newLoop(scrapeLoopOptions{
Expand Down Expand Up @@ -352,11 +362,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.targetMtx.Unlock()

wg.Wait()
oldClient.CloseIdleConnections()
sp.metrics.targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
}

// Must be called with sp.mtx held.
func (sp *scrapePool) checkSymbolTable() {
// Here we take steps to clear out the symbol table if it has grown a lot.
// After waiting some time for things to settle, we take the size of the symbol-table.
// If, after some more time, the table has grown to twice that size, we start a new one.
Expand All @@ -367,11 +376,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
} else if sp.symbolTable.Len() > 2*sp.initialSymbolTableLen {
sp.symbolTable = labels.NewSymbolTable()
sp.initialSymbolTableLen = 0
sp.restartLoops(false) // To drop all caches.
}
sp.lastSymbolTableCheck = time.Now()
}

return nil
}

// Sync converts target groups into actual scrape targets and synchronizes
Expand Down Expand Up @@ -408,8 +416,10 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
}
}
}
sp.metrics.targetScrapePoolSymbolTableItems.WithLabelValues(sp.config.JobName).Set(float64(sp.symbolTable.Len()))
sp.targetMtx.Unlock()
sp.sync(all)
sp.checkSymbolTable()

sp.metrics.targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
Expand Down
3 changes: 3 additions & 0 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,9 @@ func (db *DB) compactHead(head *RangeHead) error {
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
return fmt.Errorf("head memory truncate: %w", err)
}

db.head.RebuildSymbolTable(db.logger)

return nil
}

Expand Down
68 changes: 68 additions & 0 deletions tsdb/head_dedupelabels.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package tsdb

import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/prometheus/prometheus/model/labels"
)

Expand All @@ -25,3 +28,68 @@ func (s *memSeries) labels() labels.Labels {
defer s.Unlock()
return s.lset
}

// RebuildSymbolTable goes through all the series in h, build a SymbolTable with all names and values,
// replace each series' Labels with one using that SymbolTable.
func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable {
level.Info(logger).Log("msg", "RebuildSymbolTable starting")
st := labels.NewSymbolTable()
builder := labels.NewScratchBuilderWithSymbolTable(st, 0)
rebuildLabels := func(lbls labels.Labels) labels.Labels {
builder.Reset()
lbls.Range(func(l labels.Label) {
builder.Add(l.Name, l.Value)
})
return builder.Labels()
}

for i := 0; i < h.series.size; i++ {
h.series.locks[i].Lock()

for _, s := range h.series.hashes[i].unique {
s.Lock()
s.lset = rebuildLabels(s.lset)
s.Unlock()
}

for _, all := range h.series.hashes[i].conflicts {
for _, s := range all {
s.Lock()
s.lset = rebuildLabels(s.lset)
s.Unlock()
}
}

h.series.locks[i].Unlock()
}
type withReset interface{ ResetSymbolTable(*labels.SymbolTable) }
if e, ok := h.exemplars.(withReset); ok {
e.ResetSymbolTable(st)
}
level.Info(logger).Log("msg", "RebuildSymbolTable finished", "size", st.Len())
return st
}

func (ce *CircularExemplarStorage) ResetSymbolTable(st *labels.SymbolTable) {
builder := labels.NewScratchBuilderWithSymbolTable(st, 0)
rebuildLabels := func(lbls labels.Labels) labels.Labels {
builder.Reset()
lbls.Range(func(l labels.Label) {
builder.Add(l.Name, l.Value)
})
return builder.Labels()
}

ce.lock.RLock()
defer ce.lock.RUnlock()

for _, v := range ce.index {
v.seriesLabels = rebuildLabels(v.seriesLabels)
}
for i := range ce.exemplars {
if ce.exemplars[i].ref == nil {
continue
}
ce.exemplars[i].exemplar.Labels = rebuildLabels(ce.exemplars[i].exemplar.Labels)
}
}
7 changes: 7 additions & 0 deletions tsdb/head_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@
package tsdb

import (
"github.com/go-kit/log"

"github.com/prometheus/prometheus/model/labels"
)

// Helper method to access labels; trivial when not using dedupelabels.
func (s *memSeries) labels() labels.Labels {
return s.lset
}

// No-op when not using dedupelabels.
func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable {
return nil
}

0 comments on commit 89608c6

Please sign in to comment.