Skip to content

Commit

Permalink
fix the data inconsistency by moving the SetConsistentIndex into the …
Browse files Browse the repository at this point in the history
…transaction lock
  • Loading branch information
ahrtr committed Mar 28, 2022
1 parent be29295 commit acc74bf
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
12 changes: 10 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setUnlockCallback(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand All @@ -1795,13 +1795,21 @@ func (s *EtcdServer) apply(
return appliedt, appliedi, shouldStop
}

func (s *EtcdServer) setUnlockCallback(index, term uint64) {
if s.be != nil {
s.be.SetOnPreUnlockFunc(func() {
s.consistIndex.SetConsistentIndex(index, term)
})
}
}

// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setUnlockCallback(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down
8 changes: 3 additions & 5 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
}}

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
assert.Equal(t, uint64(2), appliedi)
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}

t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
Expand All @@ -698,9 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
rindex, _ := schema.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)

}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
22 changes: 22 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type Backend interface {
Defrag() error
ForceCommit()
Close() error

// SetOnPreUnlockFunc sets a callback function which is called before unlocking the batchTx.
SetOnPreUnlockFunc(func())
OnPreUnlock()
}

type Snapshot interface {
Expand Down Expand Up @@ -119,9 +123,27 @@ type backend struct {

hooks Hooks

muUnlock sync.Mutex
onUnlock func()

lg *zap.Logger
}

func (b *backend) SetOnPreUnlockFunc(f func()) {
b.muUnlock.Lock()
defer b.muUnlock.Unlock()
b.onUnlock = f
}

func (b *backend) OnPreUnlock() {
b.muUnlock.Lock()
defer b.muUnlock.Unlock()
if b.onUnlock != nil {
b.onUnlock()
b.onUnlock = nil
}
}

type BackendConfig struct {
// Path is the file path to the backend file.
Path string
Expand Down
1 change: 1 addition & 0 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (t *batchTx) Lock() {
}

func (t *batchTx) Unlock() {
t.backend.OnPreUnlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
Expand Down
2 changes: 2 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,8 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetOnPreUnlockFunc(func()) {}
func (b *fakeBackend) OnPreUnlock() {}

type indexGetResp struct {
rev revision
Expand Down

0 comments on commit acc74bf

Please sign in to comment.