diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index c3a02cb4a266..ea775cc2082b 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -117,7 +117,7 @@ func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() lg := GetLogger() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(lg, tx) + current, err := schema.DetectSchemaVersion(lg, c.be.ReadTx()) if err != nil { lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -139,7 +139,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index c3a17c6ee205..50dfc1a05a94 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -74,9 +74,9 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -86,7 +86,7 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad66031dd..7ec1b1212838 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,6 +33,9 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +// UnsafeConsistentIndex is similar to ConsistentIndex, +// but it shouldn't lock the transaction. +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -106,7 +122,8 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e925f47d4501..dc36b5cc4b8f 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,8 +207,10 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. + consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockHook(s.getTxPostLockHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } +func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { + atomic.StoreUint64(&s.consistentIdx, cIdx) + atomic.StoreUint64(&s.consistentTerm, cTerm) +} + +func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { + return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) +} + func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockHook() func() { + return func() { + cIdx, term := s.getConsistentIndexAndTerm() + if cIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(cIdx, term) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 07636c6e468c..0ac5532b015b 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 0a77fd669da4..4b5e96a071c9 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index c558ecacd68a..4e8ef6d07b69 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockHook sets a txPostLockHook. + SetTxPostLockHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockHook is called each time right after locking the tx. + txPostLockHook func() + lg *zap.Logger } @@ -230,6 +236,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockHook at the moment. + b.batchTx.LockWithoutHook() + defer b.batchTx.Unlock() + b.txPostLockHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -439,7 +453,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockWithoutHook() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index b2b0ad7cbf02..e427606b9c2c 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -53,6 +53,9 @@ type BatchTx interface { Commit() // CommitAndStop commits the previous tx and does not create a new one. CommitAndStop() + + // LockWithoutHook doesn't execute the txPostLockHook. + LockWithoutHook() } type batchTx struct { @@ -64,6 +67,13 @@ type batchTx struct { } func (t *batchTx) Lock() { + t.LockWithoutHook() + if t.backend.txPostLockHook != nil { + t.backend.txPostLockHook() + } +} + +func (t *batchTx) LockWithoutHook() { t.Mutex.Lock() } @@ -214,14 +224,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } @@ -291,13 +301,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde18d..9b79c090afc9 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba944008216b..941f056a9219 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c200c0d9273e..86ed4f00b470 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -875,6 +875,7 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockWithoutHook() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -916,6 +917,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0bfd0..09a49994da46 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c371ef..fc334a8bcf91 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9cef78..38eea6f9179f 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85a4f..153699e694aa 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b851a..e1e44dab5f90 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5bd56..2b4c15f29c84 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -31,7 +31,7 @@ var ( // Validate checks provided backend to confirm that schema used is supported. func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return unsafeValidate(lg, tx) } @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) }