diff --git a/tsdb/db.go b/tsdb/db.go index 64e1158d41b..3b1dee27d40 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2060,6 +2060,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) var headQuerier storage.Querier + inoMint := mint if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) var err error @@ -2084,13 +2085,14 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } + inoMint = newMint } } if overlapsOOO { // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) - headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) + headQuerier = NewHeadAndOOOQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier) } if headQuerier != nil { @@ -2136,6 +2138,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) var headQuerier storage.ChunkQuerier + inoMint := mint if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) @@ -2159,13 +2162,14 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } + inoMint = newMint } } if overlapsOOO { // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) - headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) + headQuerier = NewHeadAndOOOChunkQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier) } if headQuerier != nil { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 746d38a65bc..26cd4d057e9 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -35,6 +35,7 @@ var _ IndexReader = &HeadAndOOOIndexReader{} type HeadAndOOOIndexReader struct { *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + inoMint int64 lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef } @@ -49,13 +50,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) } -func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { +func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { hr := &headIndexReader{ head: head, mint: mint, maxt: maxt, } - return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} + return &HeadAndOOOIndexReader{hr, inoMint, lastGarbageCollectedMmapRef} } func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { @@ -76,9 +77,9 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S *chks = (*chks)[:0] if s.ooo != nil { - return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) + return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) } - *chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks) + *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks) return nil } @@ -87,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S // // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { +func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -128,7 +129,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap } if includeInOrder { - tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks) + tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks) } // There is nothing to do if we did not collect any chunk. @@ -476,7 +477,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return nil } - return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks) + return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) } func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -516,7 +517,7 @@ type HeadAndOOOQuerier struct { querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end. } -func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { +func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { cr := &headChunkReader{ head: head, mint: mint, @@ -527,7 +528,7 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio mint: mint, maxt: maxt, head: head, - index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef), chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), querier: querier, } @@ -568,7 +569,7 @@ type HeadAndOOOChunkQuerier struct { querier storage.ChunkQuerier } -func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { +func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { cr := &headChunkReader{ head: head, mint: mint, @@ -579,7 +580,7 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso mint: mint, maxt: maxt, head: head, - index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef), chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), querier: querier, } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index b5944f6c8ad..8d1527e05c0 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -360,7 +360,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { }) } - ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder @@ -451,7 +451,7 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { // We first want to test using a head index reader that covers the biggest query interval - oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) + oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0) matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) @@ -857,7 +857,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) @@ -1028,7 +1028,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 9ec807f803d..77772937a72 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3235,7 +3235,7 @@ func BenchmarkQueries(b *testing.B) { qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples) require.NoError(b, err) isoState := head.oooIso.TrackReadAfter(0) - qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead) + qOOOHead := NewHeadAndOOOQuerier(1, 1, nSamples, head, isoState, qHead) queryTypes = append(queryTypes, qt{ fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,