Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persists Term in the (bbolt) Backend #12964

Merged
merged 1 commit into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
### Storage format changes
- [WAL log's snapshots persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12735)
- [Backend persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12962) in the `meta` bucket `confState` key.
- Backend persists downgrade in the `cluster` bucket
- [Backend persists applied term](https://github.com/etcd-io/etcd/pull/) in the `meta` bucket.
- Backend persists `downgrade` in the `cluster` bucket

### Security

Expand Down
6 changes: 3 additions & 3 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func handleBackup(c *cli.Context) error {

walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3)
saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3)
saveDB(lg, destDbPath, srcDbPath, state.Commit, state.Term, &desired, withV3)

neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata))
if err != nil {
Expand Down Expand Up @@ -265,7 +265,7 @@ func raftEntryToNoOp(entry *raftpb.Entry) {
}

// saveDB copies the v3 backend and strips cluster information.
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desired *desiredCluster, v3 bool) {

// open src db to safely copy db state
if v3 {
Expand Down Expand Up @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, false)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
Expand Down
15 changes: 8 additions & 7 deletions etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}

st, index := rebuildStoreV2()
st, index, term := rebuildStoreV2()
be := prepareBackend()
defer be.Close()

Expand All @@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
}()

readKeys(reader, be)
cindex.UpdateConsistentIndex(be.BatchTx(), index, true)
cindex.UpdateConsistentIndex(be.BatchTx(), index, term, true)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
Expand Down Expand Up @@ -127,8 +127,7 @@ func prepareBackend() backend.Backend {
return be
}

func rebuildStoreV2() (v2store.Store, uint64) {
var index uint64
func rebuildStoreV2() (st v2store.Store, index uint64, term uint64) {
cl := membership.NewCluster(zap.NewExample())

waldir := migrateWALdir
Expand All @@ -147,6 +146,7 @@ func rebuildStoreV2() (v2store.Store, uint64) {
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
term = snapshot.Metadata.Term
}

w, err := wal.OpenForRead(zap.NewExample(), waldir, walsnap)
Expand All @@ -160,7 +160,7 @@ func rebuildStoreV2() (v2store.Store, uint64) {
ExitWithError(ExitError, err)
}

st := v2store.New()
st = v2store.New()
if snapshot != nil {
err := st.Recovery(snapshot.Data)
if err != nil {
Expand Down Expand Up @@ -191,12 +191,13 @@ func rebuildStoreV2() (v2store.Store, uint64) {
applyRequest(req, applier)
}
}
if ent.Index > index {
if ent.Index >= index {
index = ent.Index
term = ent.Term
}
}

return st, index
return st, index, term
}

func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
Expand Down
6 changes: 3 additions & 3 deletions etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
return err
}

if err := s.updateCIndex(hardstate.Commit); err != nil {
if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil {
return err
}

Expand Down Expand Up @@ -475,10 +475,10 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
return &hardState, w.SaveSnapshot(snapshot)
}

func (s *v3Manager) updateCIndex(commit uint64) error {
func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()

cindex.UpdateConsistentIndex(be.BatchTx(), commit, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
return nil
}
2 changes: 1 addition & 1 deletion server/etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
Expand Down
73 changes: 49 additions & 24 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
MetaBucketName = []byte("meta")

ConsistentIndexKeyName = []byte("consistent_index")
TermKeyName = []byte("term")
)

type Backend interface {
Expand All @@ -39,7 +40,7 @@ type ConsistentIndexer interface {
ConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)
SetConsistentIndex(v uint64, term uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
Expand All @@ -52,9 +53,13 @@ type ConsistentIndexer interface {
// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value.
// It caches the "consistent_index" key's value.
// Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64
// term represents the RAFT term of committed entry in a consistent replica log.
// Accessed through atomics so must be 64-bit aligned.
// The value is being persisted in the backend since v3.5.
term uint64

// be is used for initial read consistentIndex
be Backend
Expand All @@ -75,38 +80,44 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()

v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
v, term := ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}

func (ci *consistentIndex) SetConsistentIndex(v uint64) {
func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
atomic.StoreUint64(&ci.term, term)
}

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
UnsafeUpdateConsistentIndex(tx, index, true)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
}

func (ci *consistentIndex) SetBackend(be Backend) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
ci.SetConsistentIndex(0, 0)
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}

type fakeConsistentIndex struct{ index uint64 }
type fakeConsistentIndex struct {
index uint64
term uint64
}

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}

func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
Expand All @@ -124,47 +135,61 @@ func CreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
}

// unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
return v
_, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}

// ReadConsistentIndex loads consistent index from given transaction.
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) uint64 {
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return unsafeReadConsistentIndex(tx)
}

func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}

if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}

bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(MetaBucketName, TermKeyName, bs2)
}
}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}
16 changes: 7 additions & 9 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
Expand All @@ -38,8 +39,9 @@ func TestConsistentIndex(t *testing.T) {
UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := rand.Uint64()
ci.SetConsistentIndex(r)
r := uint64(7890123)
term := uint64(234)
ci.SetConsistentIndex(r, term)
index := ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
Expand All @@ -54,15 +56,11 @@ func TestConsistentIndex(t *testing.T) {
defer b.Close()
ci.SetBackend(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)

ci = NewConsistentIndex(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)
}

func TestFakeConsistentIndex(t *testing.T) {
Expand All @@ -74,7 +72,7 @@ func TestFakeConsistentIndex(t *testing.T) {
t.Errorf("expected %d,got %d", r, index)
}
r = rand.Uint64()
ci.SetConsistentIndex(r)
ci.SetConsistentIndex(r, 5)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand Down Expand Up @@ -2154,7 +2154,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down
5 changes: 4 additions & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
ents := []raftpb.Entry{{
Index: 2,
Term: 4,
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
}}
Expand All @@ -695,7 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
})
assert.Equal(t, consistIndex, cindex.ReadConsistentIndex(be.BatchTx()))
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
1 change: 1 addition & 0 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func init() {
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
{Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {},
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/kvstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
defer betesting.Close(b, be)

// This will force the index to be reread from scratch on each call.
ci.SetConsistentIndex(0)
ci.SetConsistentIndex(0, 0)

tx := be.BatchTx()
tx.Lock()
Expand Down
Loading