Skip to content

Commit

Permalink
Merge pull request #17263 from siyuanfoundation/txBuf1
Browse files Browse the repository at this point in the history
Fix tx buffer inconsistency if there are unordered key writes in one tx.
  • Loading branch information
serathius authored Mar 28, 2024
2 parents 4af78d4 + 0a54362 commit a22ae62
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 7 deletions.
190 changes: 185 additions & 5 deletions server/storage/backend/batch_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package backend_test

import (
"fmt"
"math/rand"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -241,24 +243,118 @@ func TestRangeAfterDeleteMatch(t *testing.T) {
tx.Unlock()
tx.Commit()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})

tx.Lock()
tx.UnsafeDelete(schema.Test, []byte("foo"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}

func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
func TestRangeAfterUnorderedKeyWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo5"), []byte("bar5"))
tx.UnsafePut(schema.Test, []byte("foo2"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar1"))
tx.UnsafePut(schema.Test, []byte("foo3"), []byte("bar3"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.UnsafePut(schema.Test, []byte("foo4"), []byte("bar4"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 1)
}

func TestRangeAfterAlternatingBucketWriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Key)
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafeSeqPut(schema.Key, []byte("key1"), []byte("val1"))
tx.Unlock()

tx.Lock()
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
tx.UnsafeSeqPut(schema.Key, []byte("key2"), []byte("val2"))
tx.Unlock()
tx.Commit()
// only in the 2nd commit the schema.Key key is removed from the readBuffer.buckets.
// This makes sure to test the case when an empty writeBuffer.bucket
// is used to replace the read buffer bucket.
tx.Commit()

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Key, []byte("key"), []byte("key5"), 100)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)
}

func TestRangeAfterOverwriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), []byte("foo3"), 1)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})
}

func TestRangeAfterOverwriteAndDeleteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar11"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo"), []byte("foo1")}, [][]byte{[]byte("bar1"), []byte("bar11")})

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3"))
tx.UnsafeDelete(schema.Test, []byte("foo1"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo"), nil, 0)
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), schema.Test, []byte("foo1"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar3")})
}

func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, bucket backend.Bucket, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange(bucket, key, endKey, limit)
tx.Unlock()

rtx.RLock()
ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit)
ks2, vs2 := rtx.UnsafeRange(bucket, key, endKey, limit)
rtx.RUnlock()

if diff := cmp.Diff(ks1, ks2); diff != "" {
Expand Down Expand Up @@ -294,3 +390,87 @@ func checkUnsafeForEach(t *testing.T, tx backend.UnsafeReader, expectedKeys, exp
t.Errorf("values on transaction doesn't match expected, diff: %s", diff)
}
}

// runWriteback is used test the txWriteBuffer.writeback function, which is called inside tx.Unlock().
// The parameters are chosen based on defaultBatchLimit = 10000
func runWriteback(t testing.TB, kss, vss [][]string, isSeq bool) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafeCreateBucket(schema.Key)
tx.Unlock()

for i, ks := range kss {
vs := vss[i]
tx.Lock()
for j := 0; j < len(ks); j++ {
if isSeq {
tx.UnsafeSeqPut(schema.Key, []byte(ks[j]), []byte(vs[j]))
} else {
tx.UnsafePut(schema.Test, []byte(ks[j]), []byte(vs[j]))
}
}
tx.Unlock()
}
}

func BenchmarkWritebackSeqBatches1BatchSize10000(b *testing.B) { benchmarkWriteback(b, 1, 10000, true) }

func BenchmarkWritebackSeqBatches10BatchSize1000(b *testing.B) { benchmarkWriteback(b, 10, 1000, true) }

func BenchmarkWritebackSeqBatches100BatchSize100(b *testing.B) { benchmarkWriteback(b, 100, 100, true) }

func BenchmarkWritebackSeqBatches1000BatchSize10(b *testing.B) { benchmarkWriteback(b, 1000, 10, true) }

func BenchmarkWritebackNonSeqBatches1000BatchSize1(b *testing.B) {
// for non sequential writes, the batch size is usually small, 1 or the order of cluster size.
benchmarkWriteback(b, 1000, 1, false)
}

func BenchmarkWritebackNonSeqBatches10000BatchSize1(b *testing.B) {
benchmarkWriteback(b, 10000, 1, false)
}

func BenchmarkWritebackNonSeqBatches100BatchSize10(b *testing.B) {
benchmarkWriteback(b, 100, 10, false)
}

func BenchmarkWritebackNonSeqBatches1000BatchSize10(b *testing.B) {
benchmarkWriteback(b, 1000, 10, false)
}

func benchmarkWriteback(b *testing.B, batches, batchSize int, isSeq bool) {
// kss and vss are key and value arrays to write with size batches*batchSize
var kss, vss [][]string
for i := 0; i < batches; i++ {
var ks, vs []string
for j := i * batchSize; j < (i+1)*batchSize; j++ {
k := fmt.Sprintf("key%d", j)
v := fmt.Sprintf("val%d", j)
ks = append(ks, k)
vs = append(vs, v)
}
if !isSeq {
// make sure each batch is shuffled differently but the same for different test runs.
shuffleList(ks, i*batchSize)
}
kss = append(kss, ks)
vss = append(vss, vs)
}
b.ResetTimer()
for n := 1; n < b.N; n++ {
runWriteback(b, kss, vss, isSeq)
}
}

func shuffleList(l []string, seed int) {
r := rand.New(rand.NewSource(int64(seed)))
for i := 0; i < len(l); i++ {
j := r.Intn(i + 1)
l[i], l[j] = l[j], l[i]
}
}
12 changes: 10 additions & 2 deletions server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
if seq, ok := txw.bucket2seq[k]; ok && !seq {
wb.dedupe()
}
txr.buckets[k] = wb
continue
}
Expand Down Expand Up @@ -218,10 +221,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
return
}
bb.dedupe()
}

// dedupe removes duplicates, using only newest update
func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}
sort.Stable(bb)

// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
Expand Down
50 changes: 50 additions & 0 deletions server/storage/backend/tx_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,53 @@ func Test_bucketBuffer_CopyUsed(t *testing.T) {
})
}
}

func TestDedupe(t *testing.T) {
tests := []struct {
name string
keys, vals, expectedKeys, expectedVals []string
}{
{
name: "empty",
keys: []string{},
vals: []string{},
expectedKeys: []string{},
expectedVals: []string{},
},
{
name: "single kv",
keys: []string{"key1"},
vals: []string{"val1"},
expectedKeys: []string{"key1"},
expectedVals: []string{"val1"},
},
{
name: "duplicate key",
keys: []string{"key1", "key1"},
vals: []string{"val1", "val2"},
expectedKeys: []string{"key1"},
expectedVals: []string{"val2"},
},
{
name: "unordered keys",
keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"},
vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"},
expectedKeys: []string{"key1", "key2", "key3", "key4"},
expectedVals: []string{"val2", "val4", "val1", "val6"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bb := &bucketBuffer{buf: make([]kv, 10), used: 0}
for i := 0; i < len(tt.keys); i++ {
bb.add([]byte(tt.keys[i]), []byte(tt.vals[i]))
}
bb.dedupe()
assert.Equal(t, bb.used, len(tt.expectedKeys))
for i := 0; i < bb.used; i++ {
assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i]))
assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i]))
}
})
}
}

0 comments on commit a22ae62

Please sign in to comment.