From 2dbecea5b26282b5ded675b5ba5922976340784f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 19:49:58 +0200 Subject: [PATCH] Simplify KVStore interaction with cindex thanks to hooks. --- server/etcdserver/api/v3rpc/maintenance.go | 2 +- server/etcdserver/cindex/cindex.go | 78 ++++++++++++++-------- server/etcdserver/cindex/cindex_test.go | 8 +-- server/etcdserver/server.go | 7 +- server/lease/lessor.go | 11 +-- server/mvcc/kv.go | 11 --- server/mvcc/kvstore.go | 29 +------- server/mvcc/kvstore_bench_test.go | 31 +++++---- server/mvcc/kvstore_txn.go | 1 - server/mvcc/watchable_store.go | 9 ++- 10 files changed, 84 insertions(+), 103 deletions(-) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index dcacbf4978c..38cc9137163 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -34,7 +34,7 @@ import ( ) type KVGetter interface { - KV() mvcc.ConsistentWatchableKV + KV() mvcc.WatchableKV } type BackendGetter interface { diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 73e96fd70c4..6f8661b6d2a 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -28,6 +28,10 @@ var ( ConsistentIndexKeyName = []byte("consistent_index") ) +type Backend interface { + BatchTx() backend.BatchTx +} + // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. type ConsistentIndexer interface { @@ -41,32 +45,38 @@ type ConsistentIndexer interface { // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) - // SetBatchTx set the available backend.BatchTx for ConsistentIndexer. - SetBatchTx(tx backend.BatchTx) + // SetBackend set the available backend.BatchTx for ConsistentIndexer. + SetBackend(be Backend) } // consistentIndex implements the ConsistentIndexer interface. type consistentIndex struct { - tx backend.BatchTx // consistentIndex represents the offset of an entry in a consistent replica log. - // it caches the "consistent_index" key's value. Accessed - // through atomics so must be 64-bit aligned. + // it caches the "consistent_index" key's value. + // Accessed through atomics so must be 64-bit aligned. consistentIndex uint64 - mutex sync.Mutex + + // be is used for initial read consistentIndex + be Backend + // mutex is protecting be. + mutex sync.Mutex } -func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { - return &consistentIndex{tx: tx} +// NewConsistentIndex creates a new consistent index. +// If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`. +func NewConsistentIndex(be Backend) ConsistentIndexer { + return &consistentIndex{be: be} } func (ci *consistentIndex) ConsistentIndex() uint64 { - if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { return index } ci.mutex.Lock() defer ci.mutex.Unlock() - v := ReadConsistentIndex(ci.tx) + + v := ReadConsistentIndex(ci.be.BatchTx()) + atomic.StoreUint64(&ci.consistentIndex, v) return v } @@ -76,18 +86,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - - if index == 0 { - // Never save 0 as it means that we didn't loaded the real index yet. - return - } - unsafeUpdateConsistentIndex(tx, index) + UnsafeUpdateConsistentIndex(tx, index, true) } -func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { +func (ci *consistentIndex) SetBackend(be Backend) { ci.mutex.Lock() defer ci.mutex.Unlock() - ci.tx = tx + ci.be = be + // After the backend is changed, the first access should re-read it. + ci.SetConsistentIndex(0) } func NewFakeConsistentIndex(index uint64) ConsistentIndexer { @@ -102,13 +109,21 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { atomic.StoreUint64(&f.index, index) } -func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} -func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} +func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} +func (f *fakeConsistentIndex) SetBackend(_ Backend) {} +// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet). func UnsafeCreateMetaBucket(tx backend.BatchTx) { tx.UnsafeCreateBucket(MetaBucketName) } +// CreateMetaBucket creates the `meta` bucket (if it does not exists yet). +func CreateMetaBucket(tx backend.BatchTx) { + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket(MetaBucketName) +} + // unsafeGetConsistentIndex loads consistent index from given transaction. // returns 0 if the data are not found. func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { @@ -128,7 +143,19 @@ func ReadConsistentIndex(tx backend.ReadTx) uint64 { return unsafeReadConsistentIndex(tx) } -func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) { +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return + } + + if onlyGrow { + oldi := unsafeReadConsistentIndex(tx) + if index <= oldi { + return + } + } + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. binary.BigEndian.PutUint64(bs, index) // put the index into the underlying backend @@ -136,13 +163,8 @@ func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) { tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) } -func UpdateConsistentIndex(tx backend.BatchTx, index uint64) { +func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - - oldi := unsafeReadConsistentIndex(tx) - if index <= oldi { - return - } - unsafeUpdateConsistentIndex(tx, index) + UnsafeUpdateConsistentIndex(tx, index, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index eb577b8fd64..c500260d38a 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -27,13 +27,14 @@ import ( func TestConsistentIndex(t *testing.T) { be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - ci := NewConsistentIndex(be.BatchTx()) + ci := NewConsistentIndex(be) tx := be.BatchTx() if tx == nil { t.Fatal("batch tx is nil") } tx.Lock() + UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() @@ -51,14 +52,13 @@ func TestConsistentIndex(t *testing.T) { b := backend.NewDefaultBackend(tmpPath) defer b.Close() - ci.SetConsistentIndex(0) - ci.SetBatchTx(b.BatchTx()) + ci.SetBackend(b) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) } - ci = NewConsistentIndex(b.BatchTx()) + ci = NewConsistentIndex(b) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 9ea22aa35dc..2a76dca2185 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -256,7 +256,7 @@ type EtcdServer struct { applyV3Internal applierV3Internal applyWait wait.WaitTime - kv mvcc.ConsistentWatchableKV + kv mvcc.WatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend @@ -1210,8 +1210,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } - s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) - lg.Info("restored mvcc store") + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns // on the backend are finished. @@ -2522,7 +2521,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv } func (s *EtcdServer) Backend() backend.Backend { s.bemu.Lock() defer s.bemu.Unlock() diff --git a/server/lease/lessor.go b/server/lease/lessor.go index a12591e46ef..7a1544e9395 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - l.persistTo(le.b, le.ci) + l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error { // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) - // if len(keys) > 0, txn.End() will call ci.UnsafeSave function. - if le.ci != nil && len(keys) == 0 { - le.ci.UnsafeSave(le.b.BatchTx()) - } txn.End() @@ -828,7 +824,7 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { +func (l *Lease) persistTo(b backend.Backend) { key := int64ToBytes(int64(l.ID)) lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { b.BatchTx().Lock() b.BatchTx().UnsafePut(leaseBucketName, key, val) - if ci != nil { - ci.UnsafeSave(b.BatchTx()) - } b.BatchTx().Unlock() } diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da6e..15ad263288a 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -139,14 +139,3 @@ type Watchable interface { // watch events happened or happening on the KV. NewWatchStream() WatchStream } - -// ConsistentWatchableKV is a WatchableKV that understands the consistency -// algorithm and consistent index. -// If the consistent index of executing entry is not larger than the -// consistent index of ConsistentWatchableKV, all operations in -// this entry are skipped and return empty response. -type ConsistentWatchableKV interface { - WatchableKV - // ConsistentIndex returns the current consistent index of the KV. - ConsistentIndex() uint64 -} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index ac79b07b1c5..df39301136a 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -69,8 +69,6 @@ type store struct { // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex - ci cindex.ConsistentIndexer - b backend.Backend kvindex index @@ -94,7 +92,7 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. -func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { +func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store { if lg == nil { lg = zap.NewNop() } @@ -104,7 +102,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons s := &store{ cfg: cfg, b: b, - ci: ci, kvindex: newTreeIndex(lg), le: le, @@ -314,11 +311,6 @@ func init() { func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - - tx := s.b.BatchTx() - tx.Lock() - s.saveIndex(tx) - tx.Unlock() s.b.ForceCommit() } @@ -342,8 +334,6 @@ func (s *store) Restore(b backend.Backend) error { s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) - s.ci.SetBatchTx(b.BatchTx()) - s.ci.SetConsistentIndex(0) return s.restore() } @@ -436,9 +426,7 @@ func (s *store) restore() error { tx.Unlock() - s.lg.Info("kvstore restored", - zap.Uint64("consistent-index", s.ConsistentIndex()), - zap.Int64("current-rev", s.currentRev)) + s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev)) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { @@ -533,19 +521,6 @@ func (s *store) Close() error { return nil } -func (s *store) saveIndex(tx backend.BatchTx) { - if s.ci != nil { - s.ci.UnsafeSave(tx) - } -} - -func (s *store) ConsistentIndex() uint64 { - if s.ci != nil { - return s.ci.ConsistentIndex() - } - return 0 -} - func (s *store) setupMetricsReporter() { b := s.b reportDbTotalSizeInBytesMu.Lock() diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 910bacf3050..9ea70dad414 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" @@ -28,7 +29,7 @@ import ( func BenchmarkStorePut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -47,7 +48,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // 64 byte key/val @@ -73,26 +74,30 @@ func benchmarkStoreRange(b *testing.B, n int) { } func BenchmarkConsistentIndex(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) - defer cleanup(s, be, tmpPath) + be, _ := betesting.NewDefaultTmpBackend(b) + ci := cindex.NewConsistentIndex(be) + defer betesting.Close(b, be) + + // This will force the index to be reread from scratch on each call. + ci.SetConsistentIndex(0) - tx := s.b.BatchTx() + tx := be.BatchTx() tx.Lock() - s.saveIndex(tx) + cindex.UnsafeCreateMetaBucket(tx) + ci.UnsafeSave(tx) tx.Unlock() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.ConsistentIndex() + ci.ConsistentIndex() } } // BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key func BenchmarkStorePutUpdate(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -110,7 +115,7 @@ func BenchmarkStorePutUpdate(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -130,7 +135,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { // benchmarkStoreRestore benchmarks the restore operation func benchmarkStoreRestore(revsPerKey int, b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) // use closure to capture 's' to pick up the reassignment defer func() { cleanup(s, be, tmpPath) }() @@ -146,11 +151,11 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { txn.End() } } - s.Close() + assert.NoError(b, s.Close()) b.ReportAllocs() b.ResetTimer() - s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 870c710ec24..155a13d0730 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { func (tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. if len(tw.changes) != 0 { - tw.s.saveIndex(tw.tx) // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index ff5bdb78492..63529ed672e 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -20,7 +20,6 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" @@ -70,16 +69,16 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, ci, cfg) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV { + return newWatchableStore(lg, b, le, cfg) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, ci, cfg), + store: NewStore(lg, b, le, cfg), victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(),