diff --git a/querier.go b/querier.go index b5b9ae05..7d542877 100644 --- a/querier.go +++ b/querier.go @@ -669,7 +669,7 @@ func (s *chunkSeries) Iterator() SeriesIterator { type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. // If there's no value exactly at t, it advances to the first value - // after t. + // after t. Has no effect if already past t. Seek(t int64) bool // At returns the current timestamp/value pair. At() (t int64, v float64) @@ -783,6 +783,9 @@ func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int6 func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { if t > it.maxt { + it.i = len(it.chunks) + } + if it.cur.Err() != nil || it.i >= len(it.chunks) { return false } @@ -791,31 +794,48 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { t = it.mint } - for ; it.chunks[it.i].MaxTime < t; it.i++ { - if it.i == len(it.chunks)-1 { - return false - } + // Return early if already past t. + if t0, _ := it.cur.At(); t0 >= t { + return t0 <= it.maxt } - it.cur = it.chunks[it.i].Chunk.Iterator() - if len(it.intervals) > 0 { - it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} - } + iCur := it.i + for ; it.i < len(it.chunks); it.i++ { + // Skip chunks with MaxTime < t. + if it.chunks[it.i].MaxTime >= t { + // Don't reset the iterator unless we've moved on to a different chunk. + if it.i != iCur { + it.cur = it.chunks[it.i].Chunk.Iterator() + if len(it.intervals) > 0 { + it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} + } + } - for it.cur.Next() { - t0, _ := it.cur.At() - if t0 >= t { - return true + for it.cur.Next() { + t0, _ := it.cur.At() + if t0 > it.maxt || it.cur.Err() != nil { + return false + } + if t0 >= t { + return true + } + } } } return false } func (it *chunkSeriesIterator) At() (t int64, v float64) { + // Should it panic if it.cur.Err() != nil || it.i >= len(it.chunks)? + // What about before Next() or Seek() were called? return it.cur.At() } func (it *chunkSeriesIterator) Next() bool { + if it.cur.Err() != nil || it.i >= len(it.chunks) { + return false + } + if it.cur.Next() { t, _ := it.cur.At() @@ -824,22 +844,18 @@ func (it *chunkSeriesIterator) Next() bool { return false } t, _ = it.At() - - return t <= it.maxt - } - if t > it.maxt { - return false } - return true + + return t <= it.maxt } if err := it.cur.Err(); err != nil { return false } - if it.i == len(it.chunks)-1 { - return false - } it.i++ + if it.i == len(it.chunks) { + return false + } it.cur = it.chunks[it.i].Chunk.Iterator() if len(it.intervals) > 0 { it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} diff --git a/querier_test.go b/querier_test.go index 2eb10471..a2f5c24b 100644 --- a/querier_test.go +++ b/querier_test.go @@ -390,6 +390,16 @@ func TestBlockQuerier(t *testing.T) { }, }, }, + { + lset: map[string]string{ + "s": "s", + }, + chunks: [][]sample{ + { + {1, 2}, {10, 11}, + }, + }, + }, }, queries: []query{ @@ -448,6 +458,18 @@ func TestBlockQuerier(t *testing.T) { ), }), }, + { + mint: 2, + maxt: 9, + ms: []labels.Matcher{labels.NewEqualMatcher("s", "s")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "s": "s", + }, + []sample{}, + ), + }), + }, }, } @@ -558,8 +580,8 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, tombstones: memTombstones{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, + 1: Intervals{{1, 2}}, + 2: Intervals{{2, 3}, {6, 10}}, 3: Intervals{{6, 10}}, }, @@ -572,7 +594,7 @@ func TestBlockQuerierDelete(t *testing.T) { newSeries(map[string]string{ "a": "a", }, - []sample{{5, 2}, {6, 3}, {7, 4}}, + []sample{{3, 4}, {5, 2}, {6, 3}, {7, 4}}, ), newSeries(map[string]string{ "a": "a", @@ -605,19 +627,36 @@ func TestBlockQuerierDelete(t *testing.T) { maxt: 4, ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, + []sample{{3, 4}}, + ), newSeries(map[string]string{ "a": "a", "b": "b", }, - []sample{{4, 15}}, + []sample{{1, 1}, {4, 15}}, ), }), }, { mint: 1, - maxt: 3, + maxt: 2, ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, - exp: newListSeriesSet([]Series{}), + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, + []sample{}, + ), + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{1, 1}}, + ), + }), }, }, } @@ -866,7 +905,7 @@ func TestSeriesIterator(t *testing.T) { b: []sample{}, c: []sample{}, - seek: 0, + seek: 1, success: false, exp: nil, }, @@ -987,7 +1026,7 @@ func TestSeriesIterator(t *testing.T) { {10, 22}, {203, 3493}, }, - seek: 203, + seek: 101, success: false, exp: nil, mint: 2, @@ -1038,10 +1077,10 @@ func TestSeriesIterator(t *testing.T) { testutil.Assert(t, remaining == true, "") for remaining { - sExp, eExp := exp.At() - sRes, eRes := res.At() - testutil.Equals(t, eExp, eRes) - testutil.Equals(t, sExp, sRes) + tExp, vExp := exp.At() + tRes, vRes := res.At() + testutil.Equals(t, tExp, tRes) + testutil.Equals(t, vExp, vRes) remaining = exp.Next() testutil.Equals(t, remaining, res.Next()) @@ -1084,10 +1123,10 @@ func TestSeriesIterator(t *testing.T) { testutil.Assert(t, remaining == true, "") for remaining { - sExp, eExp := exp.At() - sRes, eRes := res.At() - testutil.Equals(t, eExp, eRes) - testutil.Equals(t, sExp, sRes) + tExp, vExp := exp.At() + tRes, vRes := res.At() + testutil.Equals(t, tExp, tRes) + testutil.Equals(t, vExp, vRes) remaining = exp.Next() testutil.Equals(t, remaining, res.Next()) @@ -1116,6 +1155,24 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { testutil.Equals(t, float64(2), v) } +// Regression for: https://github.com/prometheus/tsdb/pull/327 +// Seek would go back within the same chunk. +func TestChunkSeriesIterator_DoubleSeekBackwards(t *testing.T) { + chkMetas := []chunks.Meta{ + chunkFromSamples([]sample{}), + chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), + chunkFromSamples([]sample{{4, 4}, {5, 5}}), + } + + // Seek for 3, then 2. Iterator should remain positioned on 3. + res := newChunkSeriesIterator(chkMetas, nil, 2, 8) + testutil.Assert(t, res.Seek(3) == true, "") + testutil.Assert(t, res.Seek(2) == true, "") + ts, v := res.At() + testutil.Equals(t, int64(3), ts) + testutil.Equals(t, float64(3), v) +} + // Regression when seeked chunks were still found via binary search and we always // skipped to the end when seeking a value in the current chunk. func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { @@ -1149,6 +1206,20 @@ func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { testutil.Assert(t, it.Next() == false, "") } +// Regression for: https://github.com/prometheus/tsdb/pull/327 +// Calling Seek() with a time between [mint, maxt] after the iterator had +// already passed the end would incorrectly return true. +func TestChunkSeriesIterator_SeekWithMinTime(t *testing.T) { + metas := []chunks.Meta{ + chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}), + } + + it := newChunkSeriesIterator(metas, nil, 2, 5) + testutil.Assert(t, it.Seek(6) == false, "") + // A second, within bounds Seek() used to succeed. Make sure it also returns false. + testutil.Assert(t, it.Seek(3) == false, "") +} + func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} chunkMetas := [][]chunks.Meta{