From f72fbb0a8ff7e09dbbc7a16fddf57e68458646f2 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 30 Mar 2022 15:26:31 +0800 Subject: [PATCH] add a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/migrate_command.go | 4 +-- server/etcdserver/adapters.go | 8 +++--- server/etcdserver/cindex/cindex.go | 23 ++++++++++++++--- server/etcdserver/server.go | 31 ++++++++++++++++++++--- server/etcdserver/server_test.go | 7 ++--- server/lease/lessor.go | 2 +- server/storage/backend/backend.go | 16 +++++++++++- server/storage/backend/batch_tx.go | 18 ++++++++++--- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 2 ++ server/storage/schema/alarm.go | 2 +- server/storage/schema/auth.go | 2 +- server/storage/schema/cindex.go | 6 ++--- server/storage/schema/membership.go | 6 ++--- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- 17 files changed, 104 insertions(+), 41 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index c3a02cb4a266..ea775cc2082b 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -117,7 +117,7 @@ func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() lg := GetLogger() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(lg, tx) + current, err := schema.DetectSchemaVersion(lg, c.be.ReadTx()) if err != nil { lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -139,7 +139,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index c3a17c6ee205..50dfc1a05a94 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -74,9 +74,9 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -86,7 +86,7 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad66031dd..7ec1b1212838 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,6 +33,9 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +// UnsafeConsistentIndex is similar to ConsistentIndex, +// but it shouldn't lock the transaction. +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -106,7 +122,8 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e925f47d4501..dc36b5cc4b8f 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,8 +207,10 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. + consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockHook(s.getTxPostLockHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } +func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { + atomic.StoreUint64(&s.consistentIdx, cIdx) + atomic.StoreUint64(&s.consistentTerm, cTerm) +} + +func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { + return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) +} + func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockHook() func() { + return func() { + cIdx, term := s.getConsistentIndexAndTerm() + if cIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(cIdx, term) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 07636c6e468c..0ac5532b015b 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 0a77fd669da4..4b5e96a071c9 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index c558ecacd68a..4e8ef6d07b69 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockHook sets a txPostLockHook. + SetTxPostLockHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockHook is called each time right after locking the tx. + txPostLockHook func() + lg *zap.Logger } @@ -230,6 +236,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockHook at the moment. + b.batchTx.LockWithoutHook() + defer b.batchTx.Unlock() + b.txPostLockHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -439,7 +453,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockWithoutHook() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index b2b0ad7cbf02..e427606b9c2c 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -53,6 +53,9 @@ type BatchTx interface { Commit() // CommitAndStop commits the previous tx and does not create a new one. CommitAndStop() + + // LockWithoutHook doesn't execute the txPostLockHook. + LockWithoutHook() } type batchTx struct { @@ -64,6 +67,13 @@ type batchTx struct { } func (t *batchTx) Lock() { + t.LockWithoutHook() + if t.backend.txPostLockHook != nil { + t.backend.txPostLockHook() + } +} + +func (t *batchTx) LockWithoutHook() { t.Mutex.Lock() } @@ -214,14 +224,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } @@ -291,13 +301,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde18d..9b79c090afc9 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba944008216b..941f056a9219 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c200c0d9273e..86ed4f00b470 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -875,6 +875,7 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockWithoutHook() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -916,6 +917,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0bfd0..09a49994da46 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c371ef..fc334a8bcf91 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9cef78..38eea6f9179f 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85a4f..153699e694aa 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b851a..e1e44dab5f90 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5bd56..2b4c15f29c84 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -31,7 +31,7 @@ var ( // Validate checks provided backend to confirm that schema used is supported. func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return unsafeValidate(lg, tx) } @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) }