Skip to content

Commit

Permalink
add a txPostLockHook into the backend
Browse files Browse the repository at this point in the history
Previously the SetConsistentIndex() is called during the apply workflow,
but it's outside the db transaction. If a commit happens between SetConsistentIndex
and the following apply workflow, and etcd crashes for whatever reason right
after the commit, then etcd commits an incomplete transaction to db.
Eventually etcd runs into the data inconsistency issue.

In this commit, we move the SetConsistentIndex into a txPostLockHook, so
it will be executed inside the transaction lock.
  • Loading branch information
ahrtr committed Mar 30, 2022
1 parent 0e83f62 commit f72fbb0
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 41 deletions.
4 changes: 2 additions & 2 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func migrateCommandFunc(c *migrateConfig) error {
defer c.be.Close()
lg := GetLogger()
tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(lg, tx)
current, err := schema.DetectSchemaVersion(lg, c.be.ReadTx())
if err != nil {
lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
return err
Expand All @@ -139,7 +139,7 @@ func migrateCommandFunc(c *migrateConfig) error {
}

func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
// Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) {
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions
}

func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := s.be.ReadTx()
tx.RLock()
defer tx.RUnlock()
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
if err != nil {
return nil
Expand All @@ -86,7 +86,7 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {

func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error {
tx := s.be.BatchTx()
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}
23 changes: 20 additions & 3 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

type Backend interface {
ReadTx() backend.ReadTx
BatchTx() backend.BatchTx
}

Expand All @@ -32,6 +33,9 @@ type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
UnsafeConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64, term uint64)

Expand Down Expand Up @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()

v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
v, term := schema.ReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}

// UnsafeConsistentIndex is similar to ConsistentIndex,
// but it shouldn't lock the transaction.
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}

v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
Expand Down Expand Up @@ -106,7 +122,8 @@ type fakeConsistentIndex struct {
term uint64
}

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index }

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
Expand All @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}
31 changes: 27 additions & 4 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ type EtcdServer struct {
term uint64 // must use atomic operations to access; keep 64-bit aligned.
lead uint64 // must use atomic operations to access; keep 64-bit aligned.

consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned.
consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned.
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.

readych chan struct{}
Cfg config.ServerConfig
Expand Down Expand Up @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)

srv.be = b.storage.backend.be
srv.be.SetTxPostLockHook(srv.getTxPostLockHook())
srv.beHooks = b.storage.backend.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat

Expand Down Expand Up @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}

s.consistIndex.SetBackend(newbe)
newbe.SetTxPostLockHook(s.getTxPostLockHook())

lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

// Closing old backend might block until all the txns
Expand Down Expand Up @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 {
return atomic.LoadUint64(&s.term)
}

func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) {
atomic.StoreUint64(&s.consistentIdx, cIdx)
atomic.StoreUint64(&s.consistentTerm, cTerm)
}

func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) {
return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm)
}

func (s *EtcdServer) setLead(v uint64) {
atomic.StoreUint64(&s.lead, v)
}
Expand Down Expand Up @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setConsistentIndexAndTerm(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand Down Expand Up @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setConsistentIndexAndTerm(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down Expand Up @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status {
func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
}

func (s *EtcdServer) getTxPostLockHook() func() {
return func() {
cIdx, term := s.getConsistentIndexAndTerm()
if cIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(cIdx, term)
}
}
}
7 changes: 2 additions & 5 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}
assert.Equal(t, uint64(2), appliedi)

t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
Expand All @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
rindex, _ := schema.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
2 changes: 1 addition & 1 deletion server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()

tx.Lock()
tx.LockWithoutHook()
schema.UnsafeCreateLeaseBucket(tx)
lpbs := schema.MustUnsafeGetAllLeases(tx)
tx.Unlock()
Expand Down
16 changes: 15 additions & 1 deletion server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Backend interface {
Defrag() error
ForceCommit()
Close() error

// SetTxPostLockHook sets a txPostLockHook.
SetTxPostLockHook(func())
}

type Snapshot interface {
Expand Down Expand Up @@ -119,6 +122,9 @@ type backend struct {

hooks Hooks

// txPostLockHook is called each time right after locking the tx.
txPostLockHook func()

lg *zap.Logger
}

Expand Down Expand Up @@ -230,6 +236,14 @@ func (b *backend) BatchTx() BatchTx {
return b.batchTx
}

func (b *backend) SetTxPostLockHook(hook func()) {
// It needs to lock the batchTx, because the periodic commit
// may be accessing the txPostLockHook at the moment.
b.batchTx.LockWithoutHook()
defer b.batchTx.Unlock()
b.txPostLockHook = hook
}

func (b *backend) ReadTx() ReadTx { return b.readTx }

// ConcurrentReadTx creates and returns a new ReadTx, which:
Expand Down Expand Up @@ -439,7 +453,7 @@ func (b *backend) defrag() error {
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.Lock()
b.batchTx.LockWithoutHook()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
Expand Down
18 changes: 14 additions & 4 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type BatchTx interface {
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()

// LockWithoutHook doesn't execute the txPostLockHook.
LockWithoutHook()
}

type batchTx struct {
Expand All @@ -64,6 +67,13 @@ type batchTx struct {
}

func (t *batchTx) Lock() {
t.LockWithoutHook()
if t.backend.txPostLockHook != nil {
t.backend.txPostLockHook()
}
}

func (t *batchTx) LockWithoutHook() {
t.Mutex.Lock()
}

Expand Down Expand Up @@ -214,14 +224,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)

// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
t.LockWithoutHook()
t.commit(false)
t.Unlock()
}

// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
t.LockWithoutHook()
t.commit(true)
t.Unlock()
}
Expand Down Expand Up @@ -291,13 +301,13 @@ func (t *batchTxBuffered) Unlock() {
}

func (t *batchTxBuffered) Commit() {
t.Lock()
t.LockWithoutHook()
t.commit(false)
t.Unlock()
}

func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
t.LockWithoutHook()
t.commit(true)
t.Unlock()
}
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
}

tx := s.b.BatchTx()
tx.Lock()
tx.LockWithoutHook()
tx.UnsafeCreateBucket(schema.Key)
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
Expand Down Expand Up @@ -331,7 +331,7 @@ func (s *store) restore() error {

// restore index
tx := s.b.BatchTx()
tx.Lock()
tx.LockWithoutHook()

finishedCompact, found := UnsafeReadFinishedCompact(tx)
if found {
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
start := time.Now()

tx := s.b.BatchTx()
tx.Lock()
tx.LockWithoutHook()
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for _, key := range keys {
rev = bytesToRev(key)
Expand Down
2 changes: 2 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ type fakeBatchTx struct {
rangeRespc chan rangeResp
}

func (b *fakeBatchTx) LockWithoutHook() {}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
Expand Down Expand Up @@ -916,6 +917,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockHook(func()) {}

type indexGetResp struct {
rev revision
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/alarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {

func (s *alarmBackend) CreateAlarmBucket() {
tx := s.be.BatchTx()
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
tx.UnsafeCreateBucket(Alarm)
}
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend {

func (abe *authBackend) CreateAuthBuckets() {
tx := abe.be.BatchTx()
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
tx.UnsafeCreateBucket(Auth)
tx.UnsafeCreateBucket(AuthUsers)
Expand Down
6 changes: 3 additions & 3 deletions server/storage/schema/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {

// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
tx.LockWithoutHook()
defer tx.Unlock()
tx.UnsafeCreateBucket(Meta)
}
Expand All @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
tx.RLock()
defer tx.RUnlock()
return UnsafeReadConsistentIndex(tx)
}

Expand Down
Loading

0 comments on commit f72fbb0

Please sign in to comment.