diff --git a/querier.go b/querier.go index 42318e9a..f255a84a 100644 --- a/querier.go +++ b/querier.go @@ -15,6 +15,7 @@ package tsdb import ( "fmt" + "math" "sort" "strings" "unicode/utf8" @@ -55,7 +56,7 @@ type Series interface { // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator - // ChunkIterator returns a new iterator of the chunks of the series. + // ChunkIterator returns a new iterator for the non-overlapping chunks of the series. ChunkIterator() ChunkIterator } @@ -990,13 +991,11 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { return newVerticalMergeSeriesIterator(s.series...) } -// ChunkIterator is currently not implemented. -// TODO(bwplotka): Implement once we will want to use chunks in vertical compaction. func (s *verticalChainedSeries) ChunkIterator() ChunkIterator { - return errChunkIterator{err: errors.New("Not Implemented")} + return newVerticalMergeChunkIterator(s.series...) } -// verticalMergeSeriesIterator implements a series iterater over a list +// verticalMergeSeriesIterator implements a series iterator over a list // of time-sorted, time-overlapping iterators. type verticalMergeSeriesIterator struct { a, b SeriesIterator @@ -1076,6 +1075,141 @@ func (it *verticalMergeSeriesIterator) Err() error { return it.b.Err() } +type noSeekSeriesIterator struct { + chunkenc.Iterator + err error +} + +func (it *noSeekSeriesIterator) Seek(t int64) bool { + it.err = errors.New("not implemented: Seek method invoked for noSeekSeriesIterator") + return false +} + +func (it *noSeekSeriesIterator) Err() error { + if it.err != nil { + return it.err + } + return it.Iterator.Err() +} + +// verticalMergeChunkIterator implements a ChunkIterator over a list +// of time-sorted, time-overlapping chunk iterators for the same labels (same series). +// Any overlap in chunks will be merged using verticalMergeSeriesIterator. +type verticalMergeChunkIterator struct { + a, b ChunkIterator + aok, bok, initialized bool + + curMeta chunks.Meta + err error + + aReuseIter, bReuseIter chunkenc.Iterator +} + +func newVerticalMergeChunkIterator(s ...Series) ChunkIterator { + if len(s) == 1 { + return s[0].ChunkIterator() + } else if len(s) == 2 { + return &verticalMergeChunkIterator{ + a: s[0].ChunkIterator(), + b: s[1].ChunkIterator(), + } + } + return &verticalMergeChunkIterator{ + a: s[0].ChunkIterator(), + b: newVerticalMergeChunkIterator(s[1:]...), + } +} + +func (it *verticalMergeChunkIterator) Next() bool { + if !it.initialized { + it.aok = it.a.Next() + it.bok = it.b.Next() + it.initialized = true + } + + if !it.aok && !it.bok { + return false + } + + if !it.aok { + it.curMeta = it.b.At() + it.bok = it.b.Next() + return true + } + if !it.bok { + it.curMeta = it.a.At() + it.aok = it.a.Next() + return true + } + + aCurMeta := it.a.At() + bCurMeta := it.b.At() + + if aCurMeta.MaxTime < bCurMeta.MinTime { + it.curMeta = aCurMeta + it.aok = it.a.Next() + return true + } + + if bCurMeta.MaxTime < aCurMeta.MinTime { + it.curMeta = bCurMeta + it.bok = it.b.Next() + return true + } + + chk := chunkenc.NewXORChunk() + app, err := chk.Appender() + if err != nil { + it.err = err + return false + } + seriesIter := &verticalMergeSeriesIterator{ + a: &noSeekSeriesIterator{Iterator: aCurMeta.Chunk.Iterator(it.aReuseIter)}, + b: &noSeekSeriesIterator{Iterator: bCurMeta.Chunk.Iterator(it.bReuseIter)}, + } + + mint := int64(math.MaxInt64) + maxt := int64(0) + + // TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two. + for seriesIter.Next() { + t, v := seriesIter.At() + app.Append(t, v) + + maxt = t + if mint == math.MaxInt64 { + mint = t + } + } + if err := seriesIter.Err(); err != nil { + it.err = err + return false + } + + it.curMeta = chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + } + it.aok = it.a.Next() + it.bok = it.b.Next() + return true +} + +func (it *verticalMergeChunkIterator) At() chunks.Meta { + return it.curMeta +} + +func (it *verticalMergeChunkIterator) Err() error { + if it.err != nil { + return it.err + } + if it.a.Err() != nil { + return it.a.Err() + } + return it.b.Err() +} + // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct {