Skip to content

Commit

Permalink
move the consistentIdx and consistentTerm from Etcdserver to cindex p…
Browse files Browse the repository at this point in the history
…ackage

Removed the fields consistentIdx and consistentTerm from struct EtcdServer,
and added applyingIndex and applyingTerm into struct consistentIndex in
package cindex. We may remove the two fields completely if we decide to
remove the OnPreCommitUnsafe, and it will depend on the performance test
result.
  • Loading branch information
ahrtr committed Apr 7, 2022
1 parent e155e50 commit d6d7c53
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
37 changes: 35 additions & 2 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
ConsistentApplyingIndex() (uint64, uint64)

// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
UnsafeConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64, term uint64)

// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
SetConsistentApplyingIndex(v uint64, term uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
Expand All @@ -58,6 +64,19 @@ type consistentIndex struct {
// The value is being persisted in the backend since v3.5.
term uint64

// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
// saved to consistentIndex and term above in the txPostLockHook.
//
// TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the
// performance difference. Afterwards we can make a decision on whether
// or not we should remove OnPreCommitUnsafe. If it is true, then we
// can remove applyingIndex and applyingTerm, and save the e.Index and
// e.Term to consistentIndex and term directly in applyEntries, and
// persist them into db in the txPostLockHook.
applyingIndex uint64
applyingTerm uint64

// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
Expand Down Expand Up @@ -111,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
ci.SetConsistentIndex(0, 0)
}

func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
}

func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.applyingIndex, v)
atomic.StoreUint64(&ci.applyingTerm, term)
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
Expand All @@ -120,13 +148,18 @@ type fakeConsistentIndex struct {
term uint64
}

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { return f.index, f.term }
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index }

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}

func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
Expand Down
28 changes: 9 additions & 19 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ type EtcdServer struct {
term uint64 // must use atomic operations to access; keep 64-bit aligned.
lead uint64 // must use atomic operations to access; keep 64-bit aligned.

consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned.
consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned.
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.

readych chan struct{}
Cfg config.ServerConfig
Expand Down Expand Up @@ -1555,15 +1553,6 @@ func (s *EtcdServer) getTerm() uint64 {
return atomic.LoadUint64(&s.term)
}

func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) {
atomic.StoreUint64(&s.consistentIdx, cIdx)
atomic.StoreUint64(&s.consistentTerm, cTerm)
}

func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) {
return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm)
}

func (s *EtcdServer) setLead(v uint64) {
atomic.StoreUint64(&s.lead, v)
}
Expand Down Expand Up @@ -1788,7 +1777,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.setConsistentIndexAndTerm(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand Down Expand Up @@ -1826,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.setConsistentIndexAndTerm(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down Expand Up @@ -1925,7 +1914,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm)
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
}
return false, err
}
Expand Down Expand Up @@ -2331,9 +2321,9 @@ func (s *EtcdServer) Version() *serverversion.Manager {

func (s *EtcdServer) getTxPostLockHook() func() {
return func() {
cIdx, term := s.getConsistentIndexAndTerm()
if cIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(cIdx, term)
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}

0 comments on commit d6d7c53

Please sign in to comment.