Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the data inconsistency issue by moving the SetConsistentIndex into the transaction lock #13854

Merged
merged 6 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir

if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
Expand Down
4 changes: 2 additions & 2 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type migrateConfig struct {
func migrateCommandFunc(c *migrateConfig) error {
defer c.be.Close()
tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, tx)
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
return err
Expand All @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error {
}

func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
// Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) {
Expand Down
4 changes: 2 additions & 2 deletions server/auth/range_perm_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.uber.org/zap"
)

func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions {
func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions {
user := tx.UnsafeGetUser(userName)
if user == nil {
return nil
Expand Down Expand Up @@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}

func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {
Expand Down
14 changes: 10 additions & 4 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type TokenProvider interface {
type AuthBackend interface {
CreateAuthBuckets()
ForceCommit()
ReadTx() AuthReadTx
BatchTx() AuthBatchTx

GetUser(string) *authpb.User
Expand Down Expand Up @@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -373,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {

func (as *authStore) Recover(be AuthBackend) {
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()

enabled := tx.UnsafeReadAuthEnabled()
Expand Down Expand Up @@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return ErrAuthOldRevision
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}

u := as.be.GetUser(authInfo.Username)
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
u := tx.UnsafeGetUser(authInfo.Username)

if u == nil {
return ErrUserNotFound
Expand Down Expand Up @@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i

be.CreateAuthBuckets()
tx := be.BatchTx()
// We should call LockOutsideApply here, but the txPostLockHoos isn't set
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
// to EtcdServer yet, so it's OK.
tx.Lock()
enabled := tx.UnsafeReadAuthEnabled()
as := &authStore{
Expand Down
4 changes: 4 additions & 0 deletions server/auth/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() {
func (b *backendMock) ForceCommit() {
}

func (b *backendMock) ReadTx() AuthReadTx {
return &txMock{be: b}
}

func (b *backendMock) BatchTx() AuthBatchTx {
return &txMock{be: b}
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
s.bemu.RLock()
defer s.bemu.RUnlock()

tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := s.be.ReadTx()
tx.RLock()
defer tx.RUnlock()
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
if err != nil {
return nil
Expand All @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
defer s.bemu.RUnlock()

tx := s.be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}
2 changes: 1 addition & 1 deletion server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s
}
}
if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx())
err = schema.Validate(cfg.Logger, be.ReadTx())
if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, err
Expand Down
60 changes: 57 additions & 3 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

type Backend interface {
ReadTx() backend.ReadTx
BatchTx() backend.BatchTx
}

Expand All @@ -32,9 +33,18 @@ type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
ConsistentApplyingIndex() (uint64, uint64)

// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
UnsafeConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64, term uint64)

// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
SetConsistentApplyingIndex(v uint64, term uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
Expand All @@ -54,6 +64,19 @@ type consistentIndex struct {
// The value is being persisted in the backend since v3.5.
term uint64

// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
//
// TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the performance check before merging this PR? I would prefer not to backport a PR with todo. Please let me know if you need help with measuring it.

Copy link
Member Author

@ahrtr ahrtr Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are all good so far. I will investigate whether or not we should completely remove the OnPreCommitUnsafe in the next step only in the main branch. The change (I mean possibly removing OnPreCommitUnsafe) will not be cherry picked to 3.5.

We can use the tools/benchmark, correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to backport this change because it makes the code much simpler, which will be important for long term maintenance of release-3.5 branch. Just making sure that the decision not to fix it imminently is not rushed, let's not sacrifice the quality, however it you think it's not worth backporting that's also ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your comment is valid, but usually we only backport bug fixes to previous release, so does this PR. But completely possibly removing OnPreCommitUnsafe is a refactor, and so I don't think we should backport it to 3.5. Please note that refactoring may also introduce regression, so we should only do it in main branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is good to go.

// performance difference. Afterwards we can make a decision on whether
// or not we should remove OnPreCommitUnsafe. If it is true, then we
// can remove applyingIndex and applyingTerm, and save the e.Index and
// e.Term to consistentIndex and term directly in applyEntries, and
// persist them into db in the txPostLockInsideApplyHook.
applyingIndex uint64
applyingTerm uint64

// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
Expand All @@ -73,7 +96,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()

v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
v, term := schema.ReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}

func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}

v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}
Expand All @@ -97,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
ci.SetConsistentIndex(0, 0)
}

func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
}

func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.applyingIndex, v)
atomic.StoreUint64(&ci.applyingTerm, term)
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
Expand All @@ -106,18 +148,30 @@ type fakeConsistentIndex struct {
term uint64
}

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
}
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}

func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}
35 changes: 33 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
})
}

// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
ahrtr marked this conversation as resolved.
Show resolved Hide resolved

// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
Expand Down Expand Up @@ -978,6 +982,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}

s.consistIndex.SetBackend(newbe)
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())

lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

// Closing old backend might block until all the txns
Expand Down Expand Up @@ -1771,7 +1777,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand All @@ -1798,10 +1804,18 @@ func (s *EtcdServer) apply(
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
defer func() {
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down Expand Up @@ -1853,6 +1867,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
applyV3Performed = true
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
}

Expand Down Expand Up @@ -1895,6 +1910,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)

// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
}
return false, err
}

Expand Down Expand Up @@ -2296,3 +2318,12 @@ func (s *EtcdServer) raftStatus() raft.Status {
func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
}

func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}
7 changes: 2 additions & 5 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}
assert.Equal(t, uint64(2), appliedi)

t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
Expand All @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
rindex, _ := schema.ReadConsistentIndex(be.ReadTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
4 changes: 2 additions & 2 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()

tx.Lock()
tx.LockOutsideApply()
schema.UnsafeCreateLeaseBucket(tx)
lpbs := schema.MustUnsafeGetAllLeases(tx)
tx.Unlock()
Expand Down Expand Up @@ -845,7 +845,7 @@ func (l *Lease) expired() bool {
func (l *Lease) persistTo(b backend.Backend) {
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
schema.MustUnsafePutLease(tx, &lpb)
}
Expand Down
2 changes: 1 addition & 1 deletion server/storage/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
Expand Down
Loading