Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Add ChunksIterator method to Series interface. #665

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
## master / unreleased

- [CHANGE] `chunks.MergeOverlappingChunks` moved to `tsdb.MergeOverlappingChunks`

## 0.10.0

- [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode.
- `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s.
- `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`.
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
- [CHANGE] `Series` interface allows return chunk iterator that allows iterating over encoded chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong paragraph?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failed rebase


## 0.9.1

Expand Down
10 changes: 9 additions & 1 deletion chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ type Appender interface {

// Iterator is a simple iterator that can only get the next value.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
type Iterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (int64, float64)
Err() error
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}

// NewNopIterator returns a new chunk iterator that does not hold any data.
Expand All @@ -70,6 +77,7 @@ func NewNopIterator() Iterator {

type nopIterator struct{}

func (nopIterator) Seek(t int64) bool { return false }
func (nopIterator) At() (int64, float64) { return 0, 0 }
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
Expand Down
13 changes: 13 additions & 0 deletions chunkenc/xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ type xorIterator struct {
err error
}

func (it *xorIterator) Seek(t int64) bool {
if t >= it.t {
return false
}

for it.Next() {
if t >= it.t {
return true
}
}
return false
}

func (it *xorIterator) At() (int64, float64) {
return it.t, it.val
}
Expand Down
79 changes: 0 additions & 79 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,85 +205,6 @@ func (w *Writer) write(b []byte) error {
return err
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
Expand Down
33 changes: 32 additions & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,

mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
mergedChks, err = MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
Expand Down Expand Up @@ -1032,3 +1032,34 @@ func (c *compactionMerger) Err() error {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []chunks.Meta) ([]chunks.Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]chunks.Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
var aReuseIter, bReuseIter chunkenc.Iterator
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
chk, err := mergeOverlappingChunks(newChks[last], c, aReuseIter, bReuseIter)
if err != nil {
return nil, err
}
newChks[last] = chk
}

return newChks, nil
}
Loading