From 9215252221529d3ed56b5bf019e14acf20e4b618 Mon Sep 17 00:00:00 2001
From: Bryan Boreham <bjboreham@gmail.com>
Date: Fri, 20 Sep 2024 17:40:17 +0100
Subject: [PATCH] [BUGFIX] TSDB: Only query chunks up to truncation time
 (#14948)

If the query overlaps the range currently undergoing compaction, we
should only fetch chunks up to that time. Need to store that min time
in `HeadAndOOOIndexReader`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
---
 tsdb/db.go                 |  8 ++++++--
 tsdb/ooo_head_read.go      | 23 ++++++++++++-----------
 tsdb/ooo_head_read_test.go |  8 ++++----
 tsdb/querier_test.go       |  2 +-
 4 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/tsdb/db.go b/tsdb/db.go
index 64e1158d41b..3b1dee27d40 100644
--- a/tsdb/db.go
+++ b/tsdb/db.go
@@ -2060,6 +2060,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
 
 	overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
 	var headQuerier storage.Querier
+	inoMint := mint
 	if maxt >= db.head.MinTime() || overlapsOOO {
 		rh := NewRangeHead(db.head, mint, maxt)
 		var err error
@@ -2084,13 +2085,14 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
 			if err != nil {
 				return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
 			}
+			inoMint = newMint
 		}
 	}
 
 	if overlapsOOO {
 		// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
 		isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
-		headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier)
+		headQuerier = NewHeadAndOOOQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
 	}
 
 	if headQuerier != nil {
@@ -2136,6 +2138,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
 
 	overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
 	var headQuerier storage.ChunkQuerier
+	inoMint := mint
 	if maxt >= db.head.MinTime() || overlapsOOO {
 		rh := NewRangeHead(db.head, mint, maxt)
 		headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
@@ -2159,13 +2162,14 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
 			if err != nil {
 				return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
 			}
+			inoMint = newMint
 		}
 	}
 
 	if overlapsOOO {
 		// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
 		isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
-		headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier)
+		headQuerier = NewHeadAndOOOChunkQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
 	}
 
 	if headQuerier != nil {
diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go
index 746d38a65bc..26cd4d057e9 100644
--- a/tsdb/ooo_head_read.go
+++ b/tsdb/ooo_head_read.go
@@ -35,6 +35,7 @@ var _ IndexReader = &HeadAndOOOIndexReader{}
 
 type HeadAndOOOIndexReader struct {
 	*headIndexReader            // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
+	inoMint                     int64
 	lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
 }
 
@@ -49,13 +50,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
 	return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
 }
 
-func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
+func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
 	hr := &headIndexReader{
 		head: head,
 		mint: mint,
 		maxt: maxt,
 	}
-	return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
+	return &HeadAndOOOIndexReader{hr, inoMint, lastGarbageCollectedMmapRef}
 }
 
 func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
@@ -76,9 +77,9 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
 	*chks = (*chks)[:0]
 
 	if s.ooo != nil {
-		return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
+		return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
 	}
-	*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks)
+	*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
 	return nil
 }
 
@@ -87,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
 //
 // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
 // the oooHeadChunk will not be considered.
-func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error {
+func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
 	tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
 
 	addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@@ -128,7 +129,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
 	}
 
 	if includeInOrder {
-		tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks)
+		tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks)
 	}
 
 	// There is nothing to do if we did not collect any chunk.
@@ -476,7 +477,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
 		return nil
 	}
 
-	return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks)
+	return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
 }
 
 func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
@@ -516,7 +517,7 @@ type HeadAndOOOQuerier struct {
 	querier    storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
 }
 
-func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
+func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
 	cr := &headChunkReader{
 		head:     head,
 		mint:     mint,
@@ -527,7 +528,7 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio
 		mint:    mint,
 		maxt:    maxt,
 		head:    head,
-		index:   NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
+		index:   NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
 		chunkr:  NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
 		querier: querier,
 	}
@@ -568,7 +569,7 @@ type HeadAndOOOChunkQuerier struct {
 	querier    storage.ChunkQuerier
 }
 
-func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
+func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
 	cr := &headChunkReader{
 		head:     head,
 		mint:     mint,
@@ -579,7 +580,7 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso
 		mint:    mint,
 		maxt:    maxt,
 		head:    head,
-		index:   NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
+		index:   NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
 		chunkr:  NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
 		querier: querier,
 	}
diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go
index b5944f6c8ad..8d1527e05c0 100644
--- a/tsdb/ooo_head_read_test.go
+++ b/tsdb/ooo_head_read_test.go
@@ -360,7 +360,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
 						})
 					}
 
-					ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
+					ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 
 					var chks []chunks.Meta
 					var b labels.ScratchBuilder
@@ -451,7 +451,7 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari
 	for _, tc := range cases {
 		t.Run(tc.name, func(t *testing.T) {
 			// We first want to test using a head index reader that covers the biggest query interval
-			oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
+			oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 			matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
 			values, err := oh.LabelValues(ctx, "foo", matchers...)
 			sort.Strings(values)
@@ -857,7 +857,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
 
 			// The Series method populates the chunk metas, taking a copy of the
 			// head OOO chunk if necessary. These are then used by the ChunkReader.
-			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
+			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 			var chks []chunks.Meta
 			var b labels.ScratchBuilder
 			err = ir.Series(s1Ref, &b, &chks)
@@ -1028,7 +1028,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
 
 			// The Series method populates the chunk metas, taking a copy of the
 			// head OOO chunk if necessary. These are then used by the ChunkReader.
-			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
+			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 			var chks []chunks.Meta
 			var b labels.ScratchBuilder
 			err = ir.Series(s1Ref, &b, &chks)
diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go
index 9ec807f803d..77772937a72 100644
--- a/tsdb/querier_test.go
+++ b/tsdb/querier_test.go
@@ -3235,7 +3235,7 @@ func BenchmarkQueries(b *testing.B) {
 					qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
 					require.NoError(b, err)
 					isoState := head.oooIso.TrackReadAfter(0)
-					qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead)
+					qOOOHead := NewHeadAndOOOQuerier(1, 1, nSamples, head, isoState, qHead)
 
 					queryTypes = append(queryTypes, qt{
 						fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,