From 1a9c23df0a3adbf68e26703f0eafb437359d89f4 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 19:49:58 +0200 Subject: [PATCH] Simplify KVStore interaction with cindex thanks to hooks. --- server/etcdserver/api/v3rpc/maintenance.go | 2 +- server/etcdserver/cindex/cindex.go | 24 ++++++++++++------- server/etcdserver/cindex/cindex_test.go | 2 +- server/etcdserver/server.go | 7 +++--- server/lease/lessor.go | 11 ++------- server/mvcc/kv.go | 11 --------- server/mvcc/kvstore.go | 28 ++++------------------ server/mvcc/kvstore_bench_test.go | 16 ++++++++----- server/mvcc/kvstore_txn.go | 1 - server/mvcc/watchable_store.go | 2 +- 10 files changed, 37 insertions(+), 67 deletions(-) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index dcacbf4978cf..38cc91371630 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -34,7 +34,7 @@ import ( ) type KVGetter interface { - KV() mvcc.ConsistentWatchableKV + KV() mvcc.WatchableKV } type BackendGetter interface { diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 4e2f02140db7..80d0bd34e691 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,9 +23,9 @@ import ( ) var ( - metaBucketName = []byte("meta") + MetaBucketName = []byte("meta") - consistentIndexKeyName = []byte("consistent_index") + ConsistentIndexKeyName = []byte("consistent_index") ) // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. @@ -68,7 +68,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { defer ci.mutex.Unlock() ci.tx.Lock() defer ci.tx.Unlock() - _, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + _, vs := ci.tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0) if len(vs) == 0 { return 0 } @@ -83,13 +83,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - if index > 0 { - bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. - binary.BigEndian.PutUint64(bs, index) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return } + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. + binary.BigEndian.PutUint64(bs, index) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) } func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { @@ -112,3 +114,7 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} + +func UnsafeCreateMetaBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(MetaBucketName) +} diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index aa5761c2cb09..6e3a9ad9b6c4 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -34,7 +34,7 @@ func TestConsistentIndex(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(metaBucketName) + tx.UnsafeCreateBucket(MetaBucketName) tx.Unlock() be.ForceCommit() r := rand.Uint64() diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b67605ef561d..6b28b7aae239 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -256,7 +256,7 @@ type EtcdServer struct { applyV3Internal applierV3Internal applyWait wait.WaitTime - kv mvcc.ConsistentWatchableKV + kv mvcc.WatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend @@ -1212,8 +1212,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } - s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) - lg.Info("restored mvcc store") + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns // on the backend are finished. @@ -2521,7 +2520,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv } func (s *EtcdServer) Backend() backend.Backend { s.bemu.Lock() defer s.bemu.Unlock() diff --git a/server/lease/lessor.go b/server/lease/lessor.go index a12591e46ef1..7a1544e9395c 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - l.persistTo(le.b, le.ci) + l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error { // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) - // if len(keys) > 0, txn.End() will call ci.UnsafeSave function. - if le.ci != nil && len(keys) == 0 { - le.ci.UnsafeSave(le.b.BatchTx()) - } txn.End() @@ -828,7 +824,7 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { +func (l *Lease) persistTo(b backend.Backend) { key := int64ToBytes(int64(l.ID)) lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { b.BatchTx().Lock() b.BatchTx().UnsafePut(leaseBucketName, key, val) - if ci != nil { - ci.UnsafeSave(b.BatchTx()) - } b.BatchTx().Unlock() } diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da6eb..15ad263288a9 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -139,14 +139,3 @@ type Watchable interface { // watch events happened or happening on the KV. NewWatchStream() WatchStream } - -// ConsistentWatchableKV is a WatchableKV that understands the consistency -// algorithm and consistent index. -// If the consistent index of executing entry is not larger than the -// consistent index of ConsistentWatchableKV, all operations in -// this entry are skipped and return empty response. -type ConsistentWatchableKV interface { - WatchableKV - // ConsistentIndex returns the current consistent index of the KV. - ConsistentIndex() uint64 -} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 2256902fee19..befd73fa65e2 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -35,9 +35,9 @@ import ( var ( keyBucketName = []byte("key") - metaBucketName = []byte("meta") + metaBucketName = cindex.MetaBucketName - consistentIndexKeyName = []byte("consistent_index") + consistentIndexKeyName = cindex.ConsistentIndexKeyName scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -128,7 +128,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons tx := s.b.BatchTx() tx.Lock() tx.UnsafeCreateBucket(keyBucketName) - tx.UnsafeCreateBucket(metaBucketName) + cindex.UnsafeCreateMetaBucket(tx) tx.Unlock() s.b.ForceCommit() @@ -315,11 +315,6 @@ func init() { func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - - tx := s.b.BatchTx() - tx.Lock() - s.saveIndex(tx) - tx.Unlock() s.b.ForceCommit() } @@ -437,9 +432,7 @@ func (s *store) restore() error { tx.Unlock() - s.lg.Info("kvstore restored", - zap.Uint64("consistent-index", s.ConsistentIndex()), - zap.Int64("current-rev", s.currentRev)) + s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev)) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { @@ -534,19 +527,6 @@ func (s *store) Close() error { return nil } -func (s *store) saveIndex(tx backend.BatchTx) { - if s.ci != nil { - s.ci.UnsafeSave(tx) - } -} - -func (s *store) ConsistentIndex() uint64 { - if s.ci != nil { - return s.ci.ConsistentIndex() - } - return 0 -} - func (s *store) setupMetricsReporter() { b := s.b reportDbTotalSizeInBytesMu.Lock() diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 910bacf30502..aeeddbc41717 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -73,19 +73,23 @@ func benchmarkStoreRange(b *testing.B, n int) { } func BenchmarkConsistentIndex(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) - defer cleanup(s, be, tmpPath) + be, _ := betesting.NewDefaultTmpBackend(b) + ci := cindex.NewConsistentIndex(be.BatchTx()) + defer betesting.Close(b, be) + + // This will force the index to be reread from scratch on each call. + ci.SetConsistentIndex(0) - tx := s.b.BatchTx() + tx := be.BatchTx() tx.Lock() - s.saveIndex(tx) + cindex.UnsafeCreateMetaBucket(tx) + ci.UnsafeSave(tx) tx.Unlock() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.ConsistentIndex() + ci.ConsistentIndex() } } diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index aaa93d9ab944..4400d5771832 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { func (tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. if len(tw.changes) != 0 { - tw.s.saveIndex(tw.tx) // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index ff5bdb78492e..def2abda54b4 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -70,7 +70,7 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) WatchableKV { return newWatchableStore(lg, b, le, ci, cfg) }