Skip to content

Commit

Permalink
[BUGFIX] TSDB: Only query chunks up to truncation time (prometheus#14948
Browse files Browse the repository at this point in the history
)

If the query overlaps the range currently undergoing compaction, we
should only fetch chunks up to that time. Need to store that min time
in `HeadAndOOOIndexReader`.

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham authored Sep 20, 2024
1 parent 5b9148e commit 9215252
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
8 changes: 6 additions & 2 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {

overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.Querier
inoMint := mint
if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt)
var err error
Expand All @@ -2084,13 +2085,14 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if err != nil {
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
}
inoMint = newMint
}
}

if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier)
headQuerier = NewHeadAndOOOQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
}

if headQuerier != nil {
Expand Down Expand Up @@ -2136,6 +2138,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer

overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.ChunkQuerier
inoMint := mint
if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt)
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
Expand All @@ -2159,13 +2162,14 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
if err != nil {
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
}
inoMint = newMint
}
}

if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier)
headQuerier = NewHeadAndOOOChunkQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
}

if headQuerier != nil {
Expand Down
23 changes: 12 additions & 11 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var _ IndexReader = &HeadAndOOOIndexReader{}

type HeadAndOOOIndexReader struct {
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
inoMint int64
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
}

Expand All @@ -49,13 +50,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
}

func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
hr := &headIndexReader{
head: head,
mint: mint,
maxt: maxt,
}
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
return &HeadAndOOOIndexReader{hr, inoMint, lastGarbageCollectedMmapRef}
}

func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
Expand All @@ -76,9 +77,9 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
*chks = (*chks)[:0]

if s.ooo != nil {
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
}
*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks)
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
return nil
}

Expand All @@ -87,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error {
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))

addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
Expand Down Expand Up @@ -128,7 +129,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
}

if includeInOrder {
tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks)
tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks)
}

// There is nothing to do if we did not collect any chunk.
Expand Down Expand Up @@ -476,7 +477,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
return nil
}

return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks)
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
}

func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
Expand Down Expand Up @@ -516,7 +517,7 @@ type HeadAndOOOQuerier struct {
querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
}

func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
cr := &headChunkReader{
head: head,
mint: mint,
Expand All @@ -527,7 +528,7 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio
mint: mint,
maxt: maxt,
head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier,
}
Expand Down Expand Up @@ -568,7 +569,7 @@ type HeadAndOOOChunkQuerier struct {
querier storage.ChunkQuerier
}

func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
cr := &headChunkReader{
head: head,
mint: mint,
Expand All @@ -579,7 +580,7 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso
mint: mint,
maxt: maxt,
head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier,
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/ooo_head_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
})
}

ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)

var chks []chunks.Meta
var b labels.ScratchBuilder
Expand Down Expand Up @@ -451,7 +451,7 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values)
Expand Down Expand Up @@ -857,7 +857,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {

// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(

// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks)
Expand Down
2 changes: 1 addition & 1 deletion tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3235,7 +3235,7 @@ func BenchmarkQueries(b *testing.B) {
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
require.NoError(b, err)
isoState := head.oooIso.TrackReadAfter(0)
qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead)
qOOOHead := NewHeadAndOOOQuerier(1, 1, nSamples, head, isoState, qHead)

queryTypes = append(queryTypes, qt{
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,
Expand Down

0 comments on commit 9215252

Please sign in to comment.