From b84c4398cfa8541b2c7d5e8b5bf1750fcffcf77b Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Thu, 8 Aug 2019 19:26:56 +0100 Subject: [PATCH] Refactored TestSeriesIterator. * Simplified; merged seek and non seek cases together. Added explicit min/max only for chunk series iterator, where it is relevant. * Adjusted all seek implementation to match edge case requirement (double seek, failed seek + next). Signed-off-by: Bartek Plotka --- chunkenc/chunk.go | 4 +- chunkenc/chunk_test.go | 2 +- querier.go | 65 ++++--- querier_test.go | 431 ++++++++++++++++++----------------------- 4 files changed, 235 insertions(+), 267 deletions(-) diff --git a/chunkenc/chunk.go b/chunkenc/chunk.go index 71325843..f2e3bb85 100644 --- a/chunkenc/chunk.go +++ b/chunkenc/chunk.go @@ -59,9 +59,9 @@ type Appender interface { // Iterator iterates over the data of a time series. type Iterator interface { // Seek advances the iterator forward to the sample with the timestamp t or first value after t. - // If the current iterator points to the sample with timestamp after t already, - // Seek should not advance the iterator. + // If the current iterator points to the sample with timestamp after t already, Seek should not advance the iterator. // Seek returns false if there is no such sample with the timestamp equal or larger than t. + // Iterator can be exhausted when the Seek returns false. Seek(t int64) bool // At returns the current timestamp/value pair. At() (int64, float64) diff --git a/chunkenc/chunk_test.go b/chunkenc/chunk_test.go index 4f8c9f59..84297943 100644 --- a/chunkenc/chunk_test.go +++ b/chunkenc/chunk_test.go @@ -107,7 +107,7 @@ func testChunk(t *testing.T, c Chunk) { testutil.Ok(t, it3.Err()) testutil.Equals(t, all[mid:], res3) - testutil.Equals(t, false, it3.Seek(all[len(all)-1].t + 1)) + testutil.Equals(t, false, it3.Seek(all[len(all)-1].t+1)) } func benchmarkIterator(b *testing.B, newChunk func() Chunk) { diff --git a/querier.go b/querier.go index a39b41d7..664d2605 100644 --- a/querier.go +++ b/querier.go @@ -60,6 +60,19 @@ type Series interface { ChunkIterator() ChunkIterator } +// ChunkIterator iterates over the chunk of a time series. +type ChunkIterator interface { + // Seek advances the iterator forward to the given timestamp. + // It advances to the chunk with min time at t or first chunk with min time after t. + Seek(t int64) bool + // At returns the meta. + At() chunks.Meta + // Next advances the iterator by one. + Next() bool + // Err returns optional error if Next is false. + Err() error +} + // querier aggregates querying results from time blocks within // a single partition. type querier struct { @@ -942,7 +955,7 @@ func (it *chainedSeriesIterator) Next() bool { if it.cur.Next() { return true } - if err := it.cur.Err(); err != nil { + if it.cur.Err() != nil { return false } if it.i == len(it.series)-1 { @@ -1007,6 +1020,10 @@ func newVerticalMergeSeriesIterator(s ...Series) chunkenc.Iterator { } func (it *verticalMergeSeriesIterator) Seek(t int64) bool { + if it.initialized && it.curT >= t { + return true + } + it.aok, it.bok = it.a.Seek(t), it.b.Seek(t) it.initialized = true return it.Next() @@ -1157,7 +1174,7 @@ func mergeOverlappingChunks(a, b chunks.Meta, aReuseIter, bReuseIter chunkenc.It } mint := int64(math.MaxInt64) - maxt := int64(0) + maxt := int64(math.MinInt64) // TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two. for seriesIter.Next() { @@ -1243,31 +1260,40 @@ func (it *chunkSeriesIterator) resetCurIterator() { it.cur = it.bufDelIter } -func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { - if t > it.maxt { +func (it *chunkSeriesIterator) Seek(t int64) bool { + if it.Err() != nil || t > it.maxt || it.i > len(it.chunks)-1 { + // Exhaust iterator. + it.i = len(it.chunks) return false } - // Seek to the first valid value after t. if t < it.mint { t = it.mint } + currI := it.i for ; it.chunks[it.i].MaxTime < t; it.i++ { if it.i == len(it.chunks)-1 { + // Exhaust iterator. + it.i = len(it.chunks) return false } } - it.resetCurIterator() + if currI != it.i { + it.resetCurIterator() + } - for it.cur.Next() { - t0, _ := it.cur.At() - if t0 >= t { - return true + tc, _ := it.cur.At() + for t > tc { + if !it.cur.Next() { + // Exhaust iterator. + it.i = len(it.chunks) + return false } + tc, _ = it.cur.At() } - return false + return true } func (it *chunkSeriesIterator) At() (t int64, v float64) { @@ -1275,6 +1301,10 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { } func (it *chunkSeriesIterator) Next() bool { + if it.Err() != nil || it.i > len(it.chunks)-1 { + return false + } + if it.cur.Next() { t, _ := it.cur.At() @@ -1369,19 +1399,6 @@ func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) Err() error { return s.err } -// ChunkIterator iterates over the chunk of a time series. -type ChunkIterator interface { - // Seek advances the iterator forward to the given timestamp. - // It advances to the chunk with min time at t or first chunk with min time after t. - Seek(t int64) bool - // At returns the meta. - At() chunks.Meta - // Next advances the iterator by one. - Next() bool - // Err returns optional error if Next is false. - Err() error -} - type chunkIterator struct { chunks []chunks.Meta // series in time order i int diff --git a/querier_test.go b/querier_test.go index 5ef27b75..38925a90 100644 --- a/querier_test.go +++ b/querier_test.go @@ -672,22 +672,45 @@ func (s itSeries) Iterator() chunkenc.Iterator { return s.si } func (s itSeries) Labels() labels.Labels { return labels.Labels{} } func (s itSeries) ChunkIterator() ChunkIterator { return nil } -func TestSeriesIterator(t *testing.T) { - itcases := []struct { - a, b, c []tsdbutil.Sample - exp []tsdbutil.Sample +type iteratorCase struct { + a, b, c, expected []tsdbutil.Sample - mint, maxt int64 - }{ - { - a: []tsdbutil.Sample{}, - b: []tsdbutil.Sample{}, - c: []tsdbutil.Sample{}, + mint, maxt int64 + + // seek is zero means do not test seek. + seek int64 + seekSuccess bool +} + +func (tc iteratorCase) test(t *testing.T, it chunkenc.Iterator) { + var r []tsdbutil.Sample + if tc.seek != 0 { + testutil.Equals(t, tc.seekSuccess, it.Seek(tc.seek)) + testutil.Equals(t, tc.seekSuccess, it.Seek(tc.seek)) // Next one should be noop. + + if tc.seekSuccess { + // After successful seek iterator is ready. Grab the value. + t, v := it.At() + r = append(r, sample{t: t, v: v}) + } + } + expandedResult, err := expandSeriesIterator(it) + testutil.Ok(t, err) - exp: []tsdbutil.Sample{}, + r = append(r, expandedResult...) + testutil.Equals(t, tc.expected, r) +} +func TestSeriesIterator(t *testing.T) { + cases := []iteratorCase{ + { + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{}, mint: math.MinInt64, maxt: math.MaxInt64, + + expected: nil, }, { a: []tsdbutil.Sample{ @@ -700,27 +723,12 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, - - exp: []tsdbutil.Sample{ - sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, - }, mint: math.MinInt64, maxt: math.MaxInt64, - }, - { - a: []tsdbutil.Sample{}, - b: []tsdbutil.Sample{ - sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, - }, - c: []tsdbutil.Sample{ - sample{7, 89}, sample{9, 8}, - }, - exp: []tsdbutil.Sample{ + expected: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, }, - mint: 2, - maxt: 8, }, { a: []tsdbutil.Sample{ @@ -732,32 +740,22 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, + mint: math.MinInt64, + maxt: math.MaxInt64, - exp: []tsdbutil.Sample{ + expected: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493}, }, - mint: 6, - maxt: 10, }, - } - - seekcases := []struct { - a, b, c []tsdbutil.Sample - - seek int64 - success bool - exp []tsdbutil.Sample - - mint, maxt int64 - }{ + // Seek cases. { - a: []tsdbutil.Sample{}, - b: []tsdbutil.Sample{}, - c: []tsdbutil.Sample{}, + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{}, + seek: 1, - seek: 0, - success: false, - exp: nil, + seekSuccess: false, + expected: nil, }, { a: []tsdbutil.Sample{ @@ -767,12 +765,12 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, + seek: 10, + mint: math.MinInt64, + maxt: math.MaxInt64, - seek: 10, - success: false, - exp: nil, - mint: math.MinInt64, - maxt: math.MaxInt64, + seekSuccess: false, + expected: nil, }, { a: []tsdbutil.Sample{}, @@ -782,14 +780,14 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{7, 89}, sample{9, 8}, }, + seek: 2, + mint: math.MinInt64, + maxt: math.MaxInt64, - seek: 2, - success: true, - exp: []tsdbutil.Sample{ + seekSuccess: true, + expected: []tsdbutil.Sample{ sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, }, - mint: 5, - maxt: 8, }, { a: []tsdbutil.Sample{ @@ -801,14 +799,14 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, + seek: 10, + mint: math.MinInt64, + maxt: math.MaxInt64, - seek: 10, - success: true, - exp: []tsdbutil.Sample{ + seekSuccess: true, + expected: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, - mint: 10, - maxt: 203, }, { a: []tsdbutil.Sample{ @@ -820,133 +818,143 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{10, 22}, sample{203, 3493}, }, + seek: 203, + mint: math.MinInt64, + maxt: math.MaxInt64, - seek: 203, - success: true, - exp: []tsdbutil.Sample{ + seekSuccess: true, + expected: []tsdbutil.Sample{ sample{203, 3493}, }, - mint: 7, - maxt: 203, }, } t.Run("Chunk", func(t *testing.T) { - for _, tc := range itcases { - chkMetas := []chunks.Meta{ - tsdbutil.ChunkFromSamples(tc.a), - tsdbutil.ChunkFromSamples(tc.b), - tsdbutil.ChunkFromSamples(tc.c), - } - res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) - - smplValid := make([]tsdbutil.Sample, 0) - for _, s := range tc.exp { - if s.T() >= tc.mint && s.T() <= tc.maxt { - smplValid = append(smplValid, tsdbutil.Sample(s)) - } - } - exp := newListSeriesIterator(smplValid) - - smplExp, errExp := expandSeriesIterator(exp) - smplRes, errRes := expandSeriesIterator(res) - - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) - } - - t.Run("Seek", func(t *testing.T) { - extra := []struct { - a, b, c []tsdbutil.Sample + // Only chunk implements time filtering, so add those cases here only. + limitedMinMaxtCases := []iteratorCase{ + { + a: []tsdbutil.Sample{}, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{}, + mint: 20, + maxt: 21, - seek int64 - success bool - exp []tsdbutil.Sample + expected: nil, + }, + { + a: []tsdbutil.Sample{ + sample{1, 2}, + sample{2, 3}, + sample{3, 5}, + sample{6, 1}, + }, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{ + sample{7, 89}, sample{9, 8}, + }, + mint: 2, + maxt: 8, - mint, maxt int64 - }{ - { - a: []tsdbutil.Sample{ - sample{6, 1}, - }, - b: []tsdbutil.Sample{ - sample{9, 8}, - }, - c: []tsdbutil.Sample{ - sample{10, 22}, sample{203, 3493}, - }, + expected: []tsdbutil.Sample{ + sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, + }, + }, + { + a: []tsdbutil.Sample{ + sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, + }, + b: []tsdbutil.Sample{ + sample{7, 89}, sample{9, 8}, + }, + c: []tsdbutil.Sample{ + sample{10, 22}, sample{203, 3493}, + }, + mint: 3, + maxt: math.MaxInt64, - seek: 203, - success: false, - exp: nil, - mint: 2, - maxt: 202, + expected: []tsdbutil.Sample{ + sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493}, }, - { - a: []tsdbutil.Sample{ - sample{6, 1}, - }, - b: []tsdbutil.Sample{ - sample{9, 8}, - }, - c: []tsdbutil.Sample{ - sample{10, 22}, sample{203, 3493}, - }, + }, + { + a: []tsdbutil.Sample{ + sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, + }, + b: []tsdbutil.Sample{ + sample{7, 89}, sample{9, 8}, + }, + c: []tsdbutil.Sample{ + sample{10, 22}, sample{203, 3493}, + }, + mint: 6, + maxt: 10, - seek: 5, - success: true, - exp: []tsdbutil.Sample{sample{10, 22}}, - mint: 10, - maxt: 202, + expected: []tsdbutil.Sample{ + sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, }, - } + }, + // Same with seek. + { + a: []tsdbutil.Sample{ + sample{6, 1}, + }, + b: []tsdbutil.Sample{ + sample{9, 8}, + }, + c: []tsdbutil.Sample{ + sample{10, 22}, sample{203, 3493}, + }, + seek: 203, + mint: 2, + maxt: 202, - seekcases2 := append(seekcases, extra...) + seekSuccess: false, + expected: nil, + }, + { + a: []tsdbutil.Sample{ + sample{6, 1}, + }, + b: []tsdbutil.Sample{ + sample{9, 8}, + }, + c: []tsdbutil.Sample{ + sample{10, 22}, sample{203, 3493}, + }, + seek: 5, + mint: 10, + maxt: 202, - for _, tc := range seekcases2 { + seekSuccess: true, + expected: []tsdbutil.Sample{sample{10, 22}}, + }, + } + for i, tc := range append(cases, limitedMinMaxtCases...) { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { chkMetas := []chunks.Meta{ tsdbutil.ChunkFromSamples(tc.a), tsdbutil.ChunkFromSamples(tc.b), tsdbutil.ChunkFromSamples(tc.c), } - res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) - - smplValid := make([]tsdbutil.Sample, 0) - for _, s := range tc.exp { - if s.T() >= tc.mint && s.T() <= tc.maxt { - smplValid = append(smplValid, tsdbutil.Sample(s)) - } - } - exp := newListSeriesIterator(smplValid) - - testutil.Equals(t, tc.success, res.Seek(tc.seek)) - - if tc.success { - // Init the list and then proceed to check. - remaining := exp.Next() - testutil.Assert(t, remaining, "") - - for remaining { - sExp, eExp := exp.At() - sRes, eRes := res.At() - testutil.Equals(t, eExp, eRes) - testutil.Equals(t, sExp, sRes) - - remaining = exp.Next() - testutil.Equals(t, remaining, res.Next()) - } - } - } - }) + tc.test(t, newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)) + }) + } }) t.Run("Chain", func(t *testing.T) { + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + tc.test(t, newChainedSeriesIterator(a, b, c)) + }) + } + }) + + t.Run("Vertical", func(t *testing.T) { // Extra cases for overlapping series. - itcasesExtra := []struct { - a, b, c []tsdbutil.Sample - exp []tsdbutil.Sample - mint, maxt int64 - }{ + overlappingCases := []iteratorCase{ { a: []tsdbutil.Sample{ sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, @@ -957,12 +965,12 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{2, 33}, sample{4, 44}, sample{10, 3}, }, + mint: math.MinInt64, + maxt: math.MaxInt64, - exp: []tsdbutil.Sample{ + expected: []tsdbutil.Sample{ sample{1, 2}, sample{2, 33}, sample{3, 5}, sample{4, 44}, sample{5, 49}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 3}, }, - mint: math.MinInt64, - maxt: math.MaxInt64, }, { a: []tsdbutil.Sample{ @@ -972,83 +980,23 @@ func TestSeriesIterator(t *testing.T) { c: []tsdbutil.Sample{ sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11}, }, + mint: math.MinInt64, + maxt: math.MaxInt64, - exp: []tsdbutil.Sample{ + expected: []tsdbutil.Sample{ sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11}, sample{9, 5}, sample{13, 1}, }, - mint: math.MinInt64, - maxt: math.MaxInt64, }, } - - for _, tc := range itcases { - a, b, c := itSeries{newListSeriesIterator(tc.a)}, - itSeries{newListSeriesIterator(tc.b)}, - itSeries{newListSeriesIterator(tc.c)} - - res := newChainedSeriesIterator(a, b, c) - exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp)) - - smplExp, errExp := expandSeriesIterator(exp) - smplRes, errRes := expandSeriesIterator(res) - - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) - } - - for _, tc := range append(itcases, itcasesExtra...) { - a, b, c := itSeries{newListSeriesIterator(tc.a)}, - itSeries{newListSeriesIterator(tc.b)}, - itSeries{newListSeriesIterator(tc.c)} - - res := newVerticalMergeSeriesIterator(a, b, c) - exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp)) - - smplExp, errExp := expandSeriesIterator(exp) - smplRes, errRes := expandSeriesIterator(res) - - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) + for i, tc := range append(cases, overlappingCases...) { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + a, b, c := + itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + tc.test(t, newVerticalMergeSeriesIterator(a, b, c)) + }) } - - t.Run("Seek", func(t *testing.T) { - for _, tc := range seekcases { - ress := []chunkenc.Iterator{ - newChainedSeriesIterator( - itSeries{newListSeriesIterator(tc.a)}, - itSeries{newListSeriesIterator(tc.b)}, - itSeries{newListSeriesIterator(tc.c)}, - ), - newVerticalMergeSeriesIterator( - itSeries{newListSeriesIterator(tc.a)}, - itSeries{newListSeriesIterator(tc.b)}, - itSeries{newListSeriesIterator(tc.c)}, - ), - } - - for _, res := range ress { - exp := newListSeriesIterator(tc.exp) - - testutil.Equals(t, tc.success, res.Seek(tc.seek)) - - if tc.success { - // Init the list and then proceed to check. - remaining := exp.Next() - testutil.Assert(t, remaining, "") - - for remaining { - sExp, eExp := exp.At() - sRes, eRes := res.At() - testutil.Equals(t, eExp, eRes) - testutil.Equals(t, sExp, sRes) - - remaining = exp.Next() - testutil.Equals(t, remaining, res.Next()) - } - } - } - } - }) }) } @@ -1103,6 +1051,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { res := newChunkSeriesIterator(chkMetas, nil, 2, 8) testutil.Assert(t, res.Seek(1), "") testutil.Assert(t, res.Seek(2), "") + testutil.Assert(t, res.Seek(2), "") ts, v := res.At() testutil.Equals(t, int64(2), ts) testutil.Equals(t, float64(2), v) @@ -1514,11 +1463,13 @@ func (it *listSeriesIterator) Seek(t int64) bool { if it.idx == -1 { it.idx = 0 } + // Do binary search between current position and end. - it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + pos := sort.Search(len(it.list)-it.idx, func(i int) bool { s := it.list[i+it.idx] return s.T() >= t }) + it.idx += pos return it.idx < len(it.list) }