Skip to content

Commit

Permalink
TSDB: Lock around access to labels in head under -tags dedupelabels (p…
Browse files Browse the repository at this point in the history
…rometheus#14322)

* TSDB: Document what needs locking in memSeries

* TSDB: Lock around access to series labels

So we can modify them to reset the symbol-table.

* TSDB: Make label locking conditional on build tag

---------

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham authored Jul 5, 2024
1 parent 9198952 commit 709c5d6
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 21 deletions.
21 changes: 12 additions & 9 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,20 +1759,20 @@ type seriesHashmap struct {

func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
if s, found := m.unique[hash]; found {
if labels.Equal(s.lset, lset) {
if labels.Equal(s.labels(), lset) {
return s
}
}
for _, s := range m.conflicts[hash] {
if labels.Equal(s.lset, lset) {
if labels.Equal(s.labels(), lset) {
return s
}
}
return nil
}

func (m *seriesHashmap) set(hash uint64, s *memSeries) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.labels(), s.labels()) {
m.unique[hash] = s
return
}
Expand All @@ -1781,7 +1781,7 @@ func (m *seriesHashmap) set(hash uint64, s *memSeries) {
}
l := m.conflicts[hash]
for i, prev := range l {
if labels.Equal(prev.lset, s.lset) {
if labels.Equal(prev.labels(), s.labels()) {
l[i] = s
return
}
Expand Down Expand Up @@ -1931,7 +1931,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
}

s.iterForDeletion(check)
Expand Down Expand Up @@ -2023,7 +2023,7 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu
}
// Setting the series in the s.hashes marks the creation of series
// as any further calls to this methods would return that series.
s.seriesLifecycleCallback.PostCreation(series.lset)
s.seriesLifecycleCallback.PostCreation(series.labels())

i = uint64(series.ref) & uint64(s.size-1)

Expand Down Expand Up @@ -2064,16 +2064,19 @@ func (s sample) Type() chunkenc.ValueType {
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct {
sync.Mutex

// Members up to the Mutex are not changed after construction, so can be accessed without a lock.
ref chunks.HeadSeriesRef
lset labels.Labels
meta *metadata.Metadata

// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
// been explicitly enabled in TSDB.
shardHash uint64

// Everything after here should only be accessed with the lock held.
sync.Mutex

lset labels.Labels // Locking required with -tags dedupelabels, not otherwise.

// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
//
Expand Down
6 changes: 3 additions & 3 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()

err := a.head.exemplars.ValidateExemplar(s.lset, e)
err := a.head.exemplars.ValidateExemplar(s.labels(), e)
if err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) || errors.Is(err, storage.ErrExemplarsDisabled) {
// Duplicate, don't return an error but don't accept the exemplar.
Expand Down Expand Up @@ -708,7 +708,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
return 0, labels.EmptyLabels()
}
// returned labels must be suitable to pass to Append()
return storage.SeriesRef(s.ref), s.lset
return storage.SeriesRef(s.ref), s.labels()
}

// log writes all headAppender's data to the WAL.
Expand Down Expand Up @@ -816,7 +816,7 @@ func (a *headAppender) Commit() (err error) {
continue
}
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
if err := a.head.exemplars.AddExemplar(s.labels(), e.exemplar); err != nil {
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
continue
}
Expand Down
27 changes: 27 additions & 0 deletions tsdb/head_dedupelabels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build dedupelabels

package tsdb

import (
"github.com/prometheus/prometheus/model/labels"
)

// Helper method to access labels under lock.
func (s *memSeries) labels() labels.Labels {
s.Lock()
defer s.Unlock()
return s.lset
}
25 changes: 25 additions & 0 deletions tsdb/head_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !dedupelabels

package tsdb

import (
"github.com/prometheus/prometheus/model/labels"
)

// Helper method to access labels; trivial when not using dedupelabels.
func (s *memSeries) labels() labels.Labels {
return s.lset
}
8 changes: 4 additions & 4 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
}

slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.lset, b.lset)
return labels.Compare(a.labels(), b.labels())
})

// Convert back to list.
Expand Down Expand Up @@ -189,7 +189,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.lset)
builder.Assign(s.labels())

if chks == nil {
return nil
Expand Down Expand Up @@ -259,7 +259,7 @@ func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef,
return "", storage.ErrNotFound
}

value := memSeries.lset.Get(label)
value := memSeries.labels().Get(label)
if value == "" {
return "", storage.ErrNotFound
}
Expand All @@ -283,7 +283,7 @@ func (h *headIndexReader) LabelNamesFor(ctx context.Context, series index.Postin
// when series was garbage collected after the caller got the series IDs.
continue
}
memSeries.lset.Range(func(lbl labels.Label) {
memSeries.labels().Range(func(lbl labels.Label) {
namesMap[lbl.Name] = struct{}{}
})
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
err = h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) {
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
) {
level.Debug(h.logger).Log(
"msg", "M-mapped chunks overlap on a duplicate series record",
"series", mSeries.lset.String(),
"series", mSeries.labels().String(),
"oldref", mSeries.ref,
"oldmint", mSeries.mmappedChunks[0].minTime,
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
Expand Down Expand Up @@ -932,7 +932,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {

buf.PutByte(chunkSnapshotRecordTypeSeries)
buf.PutBE64(uint64(s.ref))
record.EncodeLabels(&buf, s.lset)
record.EncodeLabels(&buf, s.labels())
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.

s.Lock()
Expand Down Expand Up @@ -1485,7 +1485,7 @@ Outer:
continue
}

if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
if err := h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{
Labels: e.Labels,
Value: e.V,
Ts: e.T,
Expand Down
2 changes: 1 addition & 1 deletion tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
oh.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.lset)
builder.Assign(s.labels())

if chks == nil {
return nil
Expand Down

0 comments on commit 709c5d6

Please sign in to comment.