Skip to content

Commit

Permalink
Use immutable binary tree for index
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 6, 2024
1 parent 5198aca commit 8674a56
Show file tree
Hide file tree
Showing 4 changed files with 575 additions and 609 deletions.
241 changes: 105 additions & 136 deletions server/storage/mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package mvcc

import (
"sync"

"bytes"
"fmt"
"github.com/google/btree"
"go.uber.org/zap"
)
Expand All @@ -37,96 +37,110 @@ type index interface {
}

type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex]
lg *zap.Logger
baseRev int64
revisionTree []*btree.BTreeG[keyRev]
lg *zap.Logger
}

func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
lg: lg,
}
func (ti *treeIndex) Equal(b index) bool {

Check warning on line 45 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L45

Added line #L45 was not covered by tests
//TODO implement me
panic("implement me")

Check warning on line 47 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L47

Added line #L47 was not covered by tests
}

func (ti *treeIndex) Put(key []byte, rev Revision) {
keyi := &keyIndex{key: key}

ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.Main, rev.Sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.Main, rev.Sub)
func (ti *treeIndex) Insert(ki *keyIndex) {

Check warning on line 50 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L50

Added line #L50 was not covered by tests
//TODO implement me
panic("implement me")

Check warning on line 52 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L52

Added line #L52 was not covered by tests
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
ti.RLock()
defer ti.RUnlock()
return ti.unsafeGet(key, atRev)
func (ti *treeIndex) KeyIndex(ki *keyIndex) *keyIndex {

Check warning on line 55 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L55

Added line #L55 was not covered by tests
//TODO implement me
panic("implement me")

Check warning on line 57 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L57

Added line #L57 was not covered by tests
}

func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
type keyRev struct {
key []byte
mod, created Revision
version int64
}

func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
var lessThen btree.LessFunc[keyRev] = func(k keyRev, k2 keyRev) bool {
return compare(k, k2) == -1
}

func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if ki, ok := ti.tree.Get(keyi); ok {
return ki
}
return nil
func compare(k keyRev, k2 keyRev) int {
return bytes.Compare(k.key, k2.key)
}

func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
baseRev: -1,
lg: lg,
}
}

ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
if !f(item) {
return false
func (ti *treeIndex) Put(key []byte, rev Revision) {
if ti.baseRev == -1 {
ti.baseRev = rev.Main - 1
ti.revisionTree = []*btree.BTreeG[keyRev]{
btree.NewG[keyRev](32, lessThen),
}
return true
}
if rev.Main != ti.rev()+1 {
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))

Check warning on line 89 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L89

Added line #L89 was not covered by tests
}
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
item, found := prevTree.Get(keyRev{key: key})
created := rev
var version int64 = 1
if found {
created = item.created
version = item.version + 1
}
newTree := prevTree.Clone()
newTree.ReplaceOrInsert(keyRev{
key: key,
mod: rev,
created: created,
version: version,
})
ti.revisionTree = append(ti.revisionTree, newTree)
}

func (ti *treeIndex) rev() int64 {
return ti.baseRev + int64(len(ti.revisionTree)) - 1
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
idx := atRev - ti.baseRev
if idx < 0 || idx >= int64(len(ti.revisionTree)) {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
tree := ti.revisionTree[idx]
keyRev, found := tree.Get(keyRev{key: key})
if !found {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
return keyRev.mod, keyRev.created, keyRev.version, nil
}

// Revisions returns limited number of revisions from key(included) to end(excluded)
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {
ti.RLock()
defer ti.RUnlock()

if end == nil {
rev, _, _, err := ti.unsafeGet(key, atRev)
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, 0
}
return []Revision{rev}, 1
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
if limit <= 0 || len(revs) < limit {
revs = append(revs, rev)
}
total++
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]
tree.AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
if limit <= 0 || len(revs) < limit {
revs = append(revs, kr.mod)
}
total++
return true
})
return revs, total
Expand All @@ -135,119 +149,74 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
// CountRevisions returns the number of revisions
// from key(included) to end(excluded) at the given rev.
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
ti.RLock()
defer ti.RUnlock()

if end == nil {
_, _, _, err := ti.unsafeGet(key, atRev)
_, _, _, err := ti.Get(key, atRev)
if err != nil {
return 0
}
return 1
}
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]

Check warning on line 160 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L159-L160

Added lines #L159 - L160 were not covered by tests
total := 0
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++
}
tree.AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
total++

Check warning on line 163 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L162-L163

Added lines #L162 - L163 were not covered by tests
return true
})
return total
}

func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []Revision) {
ti.RLock()
defer ti.RUnlock()

if end == nil {
rev, _, _, err := ti.unsafeGet(key, atRev)
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
return [][]byte{key}, []Revision{rev}
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]
tree.AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
revs = append(revs, kr.mod)
keys = append(keys, kr.key)

Check warning on line 181 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L177-L181

Added lines #L177 - L181 were not covered by tests
return true
})
return keys, revs
}

func (ti *treeIndex) Tombstone(key []byte, rev Revision) error {
keyi := &keyIndex{key: key}

ti.Lock()
defer ti.Unlock()
ki, ok := ti.tree.Get(keyi)
if !ok {
if rev.Main != ti.rev()+1 {
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))

Check warning on line 189 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L189

Added line #L189 was not covered by tests
}
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
newTree := prevTree.Clone()
_, found := prevTree.Delete(keyRev{
key: key,
})
ti.revisionTree = append(ti.revisionTree, newTree)
if !found {
return ErrRevisionNotFound
}

return ki.tombstone(ti.lg, rev.Main, rev.Sub)
return nil
}

func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(keyi *keyIndex) bool {
// Lock is needed here to prevent modification to the keyIndex while
// compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
_, ok := ti.tree.Delete(keyi)
if !ok {
ti.lg.Panic("failed to delete during compaction")
}
}
ti.Unlock()
return true
})
idx := rev - ti.baseRev
ti.revisionTree = ti.revisionTree[idx:]
ti.baseRev = rev

Check warning on line 208 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L206-L208

Added lines #L206 - L208 were not covered by tests
return available
}

// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool {
keyi.keep(rev, available)
idx := rev - ti.baseRev
tree := ti.revisionTree[idx]
tree.Ascend(func(item keyRev) bool {
available[item.mod] = struct{}{}

Check warning on line 218 in server/storage/mvcc/index.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/index.go#L218

Added line #L218 was not covered by tests
return true
})
return available
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

if ti.tree.Len() != b.tree.Len() {
return false
}

equal := true

ti.tree.Ascend(func(aki *keyIndex) bool {
bki, _ := b.tree.Get(aki)
if !aki.equal(bki) {
equal = false
return false
}
return true
})

return equal
}

func (ti *treeIndex) Insert(ki *keyIndex) {
ti.Lock()
defer ti.Unlock()
ti.tree.ReplaceOrInsert(ki)
}
18 changes: 9 additions & 9 deletions server/storage/mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ import (
"go.uber.org/zap"
)

func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }
//func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
//func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
//func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }

func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }
//func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
//func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
//func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }

func BenchmarkIndexPutBase(b *testing.B) { benchmarkIndexPut(b, 3, 100) }
func BenchmarkIndexPutLongKey(b *testing.B) { benchmarkIndexPut(b, 512, 100) }
func BenchmarkIndexPutLargeKeySpace(b *testing.B) { benchmarkIndexPut(b, 3, 100000) }

func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }
//func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
//func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
//func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }

func BenchmarkIndexGetBase(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1, 25) }
func BenchmarkIndexGetRepeatedKeys(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1000, 25) }
Expand Down
Loading

0 comments on commit 8674a56

Please sign in to comment.