Skip to content

Commit

Permalink
Simplify KVstore dependency on cindex.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptabor committed May 4, 2021
1 parent d721559 commit 860b572
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 71 deletions.
4 changes: 1 addition & 3 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
ci := cindex.NewConsistentIndex(tx)
ci.SetConsistentIndex(idx)
ci.UnsafeSave(tx)
cindex.UpdateConsistentIndex(tx, idx)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
Expand Down
5 changes: 2 additions & 3 deletions etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
func (s *v3Manager) updateCIndex(commit uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(commit)
ci.UnsafeSave(be.BatchTx())

cindex.UpdateConsistentIndex(be.BatchTx(), commit)
return nil
}
38 changes: 19 additions & 19 deletions server/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }

func testKVRange(t *testing.T, f rangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }

func testKVRangeRev(t *testing.T, f rangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }

func testKVRangeBadRev(t *testing.T, f rangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

put3TestKVs(s)
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }

func testKVRangeLimit(t *testing.T, f rangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

kvs := put3TestKVs(s)
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF

func testKVPutMultipleTimes(t *testing.T, f putFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -321,7 +321,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {

for i, tt := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
Expand All @@ -341,7 +341,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t

func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand All @@ -362,7 +362,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
// test that range, put, delete on single key in sequence repeatedly works correctly.
func TestKVOperationInSequence(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestKVOperationInSequence(t *testing.T) {

func TestKVTxnBlockWriteOperations(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

tests := []func(){
func() { s.Put([]byte("foo"), nil, lease.NoLease) },
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {

func TestKVTxnNonBlockRange(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

txn := s.Write(traceutil.TODO())
Expand All @@ -464,7 +464,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTxnOperationInSequence(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {

func TestKVCompactReserveLastValue(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), 1)
Expand Down Expand Up @@ -568,7 +568,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {

func TestKVCompactBad(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
Expand Down Expand Up @@ -601,7 +601,7 @@ func TestKVHash(t *testing.T) {
for i := 0; i < len(hashes); i++ {
var err error
b, tmpPath := betesting.NewDefaultTmpBackend(t)
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
hashes[i], _, err = kv.Hash()
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestKVRestore(t *testing.T) {
}
for i, tt := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
tt(s)
var kvss [][]mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
Expand All @@ -651,7 +651,7 @@ func TestKVRestore(t *testing.T) {
s.Close()

// ns should recover the the previous state from backend.
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
Expand Down Expand Up @@ -683,7 +683,7 @@ func readGaugeInt(g prometheus.Gauge) int {

func TestKVSnapshot(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

wkvs := put3TestKVs(s)
Expand All @@ -703,7 +703,7 @@ func TestKVSnapshot(t *testing.T) {
}
f.Close()

ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer ns.Close()
r, err := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{})
if err != nil {
Expand All @@ -719,7 +719,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kvstore_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestScheduleCompaction(t *testing.T) {
}
for i, tt := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
tx := s.b.BatchTx()

tx.Lock()
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestScheduleCompaction(t *testing.T) {

func TestCompactAllAndRestore(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)

s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand All @@ -127,7 +127,7 @@ func TestCompactAllAndRestore(t *testing.T) {
t.Fatal(err)
}

s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
if s1.Rev() != rev {
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
}
Expand Down
20 changes: 10 additions & 10 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

func TestStoreRev(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer s.Close()

for i := 1; i <= 3; i++ {
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestRestoreDelete(t *testing.T) {
defer func() { restoreChunkKeys = oldChunk }()

b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

keys := make(map[string]struct{})
for i := 0; i < 20; i++ {
Expand All @@ -451,7 +451,7 @@ func TestRestoreDelete(t *testing.T) {
}
s.Close()

s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer s.Close()
for i := 0; i < 20; i++ {
ks := fmt.Sprintf("foo-%d", i)
Expand All @@ -473,7 +473,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
tests := []string{"recreate", "restore"}
for _, test := range tests {
b, _ := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
Expand All @@ -492,7 +492,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
var s *store
switch test {
case "recreate":
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
case "restore":
s0.Restore(b)
s = s0
Expand Down Expand Up @@ -534,7 +534,7 @@ type hashKVResult struct {
// TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
func TestHashKVWhenCompacting(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)

rev := 10000
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
// correct hash value with latest revision.
func TestHashKVZeroRevision(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)

rev := 10000
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestTxnPut(t *testing.T) {
vals := createBytesSlice(bytesN, sliceN)

b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

for i := 0; i < sliceN; i++ {
Expand All @@ -651,7 +651,7 @@ func TestTxnPut(t *testing.T) {
// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)

// write something to read later
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
mu sync.Mutex // mu protects committedKVs
)
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)

var wg sync.WaitGroup
Expand Down
11 changes: 5 additions & 6 deletions server/mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"

Expand All @@ -29,7 +28,7 @@ import (

func BenchmarkWatchableStorePut(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath)

// arbitrary number of bytes
Expand All @@ -49,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
// some synchronization operations, such as mutex locking.
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := New(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath)

// arbitrary number of bytes
Expand Down Expand Up @@ -80,7 +79,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {

func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be, tmpPath)

k := []byte("testkey")
Expand Down Expand Up @@ -123,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
// we should put to simulate the real-world use cases.
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore periodically calls syncWatchersLoop
Expand Down Expand Up @@ -180,7 +179,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{})

defer func() {
s.store.Close()
Expand Down
Loading

0 comments on commit 860b572

Please sign in to comment.