Skip to content

Commit

Permalink
server: Save consistency index and term to backend even when they dec…
Browse files Browse the repository at this point in the history
…rease

Reason to store CI and term in backend was to make db fully independent
snapshot, it was never meant to interfere with apply logic. Skip of CI
was introduced for v2->v3 migration where we wanted to prevent it from
decreasing when replaying wal in
#5391. By mistake it was added to
apply flow during refactor in
#12855 (comment).

Consistency index and term should only be negotiated and used by raft to make
decisions. Their values should only driven by raft state machine and
backend should only be responsible for storing them.
  • Loading branch information
serathius committed Apr 7, 2022
1 parent a5b9f72 commit 1ea53d5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
Expand Down
2 changes: 1 addition & 1 deletion etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer be.Close()

cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
return nil
}
4 changes: 2 additions & 2 deletions server/etcdserver/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
// create snapshot db file: "%016x.snap.db"
be := serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm, false)
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm)
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
if err = be.Close(); err != nil {
return
Expand All @@ -301,6 +301,6 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
// create backend db file
be = serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1, false)
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1)
return be.Close()
}
7 changes: 3 additions & 4 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

type Backend interface {
ReadTx() backend.ReadTx
BatchTx() backend.BatchTx
}

// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
Expand Down Expand Up @@ -119,7 +118,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
schema.UnsafeUpdateConsistentIndex(tx, index, term, true)
schema.UnsafeUpdateConsistentIndex(tx, index, term)
}

func (ci *consistentIndex) SetBackend(be Backend) {
Expand Down Expand Up @@ -170,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
schema.UnsafeUpdateConsistentIndex(tx, index, term)
}
52 changes: 52 additions & 0 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,58 @@ func TestConsistentIndex(t *testing.T) {
assert.Equal(t, r, index)
}

func TestConsistentIndexDecrease(t *testing.T) {
initIndex := uint64(100)
initTerm := uint64(10)

tcs := []struct {
name string
index uint64
term uint64
}{
{
name: "Decrease term",
index: initIndex + 1,
term: initTerm - 1,
},
{
name: "Decrease CI",
index: initIndex - 1,
term: initTerm + 1,
},
{
name: "Decrease CI and term",
index: initIndex - 1,
term: initTerm - 1,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
tx.Unlock()
be.ForceCommit()
be.Close()

be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
tx = be.BatchTx()
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
assert.Equal(t, tc.index, ci.ConsistentIndex())

ci = NewConsistentIndex(be)
assert.Equal(t, tc.index, ci.ConsistentIndex())
})
}
}

func TestFakeConsistentIndex(t *testing.T) {

r := rand.Uint64()
Expand Down
24 changes: 2 additions & 22 deletions server/storage/schema/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package schema

import (
"encoding/binary"

"go.etcd.io/etcd/server/v3/storage/backend"
)

Expand Down Expand Up @@ -56,32 +57,11 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
return UnsafeReadConsistentIndex(tx)
}

func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't load the real index yet.
return
}

if onlyGrow {
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if index > oldi {
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
}
if term > 0 && term > oldTerm {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
}
return
}

bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
Expand Down
8 changes: 4 additions & 4 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestValidate(t *testing.T) {
version: V3_5,
overrideKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
},
},
{
Expand Down Expand Up @@ -313,14 +313,14 @@ func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx
case V3_4:
case V3_5:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
case V3_6:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_7:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
UnsafeSetStorageVersion(tx, &V3_7)
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
default:
Expand Down

0 comments on commit 1ea53d5

Please sign in to comment.