From 067521981e9438024ef7cc5590198b877e946eb7 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 2 Apr 2021 21:08:03 +0200 Subject: [PATCH] v2 etcdctl backup: producing consistent state of membership --- etcdctl/ctlv2/command/backup_command.go | 120 +++++++++++++------- etcdctl/snapshot/v3_snapshot.go | 48 +++++--- server/etcdserver/api/membership/cluster.go | 17 +++ server/etcdserver/api/membership/member.go | 7 +- server/etcdserver/api/membership/store.go | 38 ++++++- 5 files changed, 167 insertions(+), 63 deletions(-) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index a425961d980..6004ee8c721 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -15,7 +15,6 @@ package command import ( - "encoding/binary" "log" "os" "path" @@ -25,11 +24,15 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" + "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -54,11 +57,41 @@ func NewBackupCommand() cli.Command { } } +type desiredCluster struct { + clusterId types.ID + nodeId types.ID + members []*membership.Member + confState raftpb.ConfState +} + +func newDesiredCluster() desiredCluster { + idgen := idutil.NewGenerator(0, time.Now()) + nodeID := idgen.Next() + clusterID := idgen.Next() + + return desiredCluster{ + clusterId: types.ID(clusterID), + nodeId: types.ID(nodeID), + members: []*membership.Member{ + { + ID: types.ID(nodeID), + Attributes: membership.Attributes{ + Name: "etcdctl-v2-backup", + }, + RaftAttributes: membership.RaftAttributes{ + PeerURLs: []string{"http://use-flag--force-new-cluster:2080"}, + }}}, + confState: raftpb.ConfState{Voters: []uint64{nodeID}}, + } +} + // handleBackup handles a request that intends to do a backup. func handleBackup(c *cli.Context) error { var srcWAL string var destWAL string + lg := zap.NewExample() + withV3 := c.Bool("with-v3") srcSnap := filepath.Join(c.String("data-dir"), "member", "snap") destSnap := filepath.Join(c.String("backup-dir"), "member", "snap") @@ -79,13 +112,12 @@ func handleBackup(c *cli.Context) error { log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err) } - walsnap := saveSnap(destSnap, srcSnap) - metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) - saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3) + desired := newDesiredCluster() - idgen := idutil.NewGenerator(0, time.Now()) - metadata.NodeID = idgen.Next() - metadata.ClusterID = idgen.Next() + walsnap := saveSnap(lg, destSnap, srcSnap, &desired) + metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) + destDbPath := filepath.Join(destSnap, "db") + saveDB(lg, destDbPath, filepath.Join(srcSnap, "db"), state.Commit, &desired, withV3) neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -102,15 +134,17 @@ func handleBackup(c *cli.Context) error { return nil } -func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { - ss := snap.New(zap.NewExample(), srcSnap) +func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) { + ss := snap.New(lg, srcSnap) snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { log.Fatal(err) } if snapshot != nil { - walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState - newss := snap.New(zap.NewExample(), destSnap) + walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState + newss := snap.New(lg, destSnap) + snapshot.Metadata.ConfState = desired.confState + snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired) if err = newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) } @@ -118,6 +152,26 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { return walsnap } +// mustTranslateV2store processes storeData such that they match 'desiredCluster'. +// In particular the method overrides membership information. +func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredCluster) []byte { + st := v2store.New() + if err := st.Recovery(storeData); err != nil { + lg.Panic("cannot translate v2store", zap.Error(err)) + } + + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetStore(st) + raftCluster.PushMembershipToStorage() + + outputData, err := st.Save() + if err != nil { + lg.Panic("cannot save v2store", zap.Error(err)) + } + return outputData +} + func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap) if err != nil { @@ -190,7 +244,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad } // saveDB copies the v3 backend and strips cluster information. -func saveDB(destDB, srcDB string, idx uint64, v3 bool) { +func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { + // open src db to safely copy db state if v3 { var src *bolt.DB @@ -229,37 +284,26 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) { } } - db, err := bolt.Open(destDB, 0644, &bolt.Options{}) - if err != nil { - log.Fatal(err) - } - tx, err := db.Begin(true) - if err != nil { + be := backend.NewDefaultBackend(destDB) + defer be.Close() + + if err := membership.TrimClusterFromBackend(be); err != nil { log.Fatal(err) } - // remove membership information; should be clobbered by --force-new-cluster - // TODO: Consider refactoring to use backend.Backend instead of bolt - // and membership.TrimMembershipFromBackend. - for _, bucket := range []string{"members", "members_removed", "cluster"} { - tx.DeleteBucket([]byte(bucket)) - } + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetBackend(be) + raftCluster.PushMembershipToStorage() - // update consistent index to match hard state if !v3 { - idxBytes := make([]byte, 8) - binary.BigEndian.PutUint64(idxBytes, idx) - b, err := tx.CreateBucketIfNotExists([]byte("meta")) - if err != nil { - log.Fatal(err) - } - b.Put([]byte("consistent_index"), idxBytes) + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket([]byte("meta")) + ci := cindex.NewConsistentIndex(tx) + ci.SetConsistentIndex(idx) + ci.UnsafeSave(tx) } - if err := tx.Commit(); err != nil { - log.Fatal(err) - } - if err := db.Close(); err != nil { - log.Fatal(err) - } } diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 31b31e0ff51..98046954334 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -32,7 +32,6 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/snapshot" - "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/config" @@ -40,6 +39,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -255,12 +255,19 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { zap.String("snap-dir", s.snapDir), zap.Stack("stack"), ) + if err = s.saveDB(); err != nil { return err } - if err = s.saveWALAndSnap(); err != nil { + hardstate, err := s.saveWALAndSnap() + if err != nil { + return err + } + + if err := s.updateCIndex(hardstate.Commit); err != nil { return err } + s.lg.Info( "restored snapshot", zap.String("path", s.srcDbPath), @@ -295,7 +302,7 @@ func (s *v3Manager) saveDB() error { } func (s *v3Manager) copyAndVerifyDB() error { - srcf, ferr := os.OpenFile(s.srcDbPath, os.O_RDONLY, 0600) + srcf, ferr := os.Open(s.srcDbPath) if ferr != nil { return ferr } @@ -373,9 +380,9 @@ func (s *v3Manager) copyAndVerifyDB() error { // saveWALAndSnap creates a WAL for the initial cluster // // TODO: This code ignores learners !!! -func (s *v3Manager) saveWALAndSnap() error { +func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { if err := fileutil.CreateDirAll(s.walDir); err != nil { - return err + return nil, err } // add members again to persist them to the store we create. @@ -392,11 +399,11 @@ func (s *v3Manager) saveWALAndSnap() error { md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} metadata, merr := md.Marshal() if merr != nil { - return merr + return nil, merr } w, walerr := wal.Create(s.lg, s.walDir, metadata) if walerr != nil { - return walerr + return nil, walerr } defer w.Close() @@ -404,7 +411,7 @@ func (s *v3Manager) saveWALAndSnap() error { for i, id := range s.cl.MemberIDs() { ctx, err := json.Marshal((*s.cl).Member(id)) if err != nil { - return err + return nil, err } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } @@ -420,7 +427,7 @@ func (s *v3Manager) saveWALAndSnap() error { } d, err := cc.Marshal() if err != nil { - return err + return nil, err } ents[i] = raftpb.Entry{ Type: raftpb.EntryConfChange, @@ -431,17 +438,18 @@ func (s *v3Manager) saveWALAndSnap() error { } commit, term := uint64(len(ents)), uint64(1) - if err := w.Save(raftpb.HardState{ + hardState := raftpb.HardState{ Term: term, Vote: peers[0].ID, Commit: commit, - }, ents); err != nil { - return err + } + if err := w.Save(hardState, ents); err != nil { + return nil, err } b, berr := st.Save() if berr != nil { - return berr + return nil, berr } confState := raftpb.ConfState{ Voters: nodeIDs, @@ -456,7 +464,17 @@ func (s *v3Manager) saveWALAndSnap() error { } sn := snap.New(s.lg, s.snapDir) if err := sn.SaveSnap(raftSnap); err != nil { - return err + return nil, err } - return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}) + snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState} + return &hardState, w.SaveSnapshot(snapshot) +} + +func (s *v3Manager) updateCIndex(commit uint64) error { + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + ci := cindex.NewConsistentIndex(be.BatchTx()) + ci.SetConsistentIndex(commit) + ci.UnsafeSave(be.BatchTx()) + return nil } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 7697b3d42b0..c335c7db240 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -859,3 +859,20 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID { sort.Sort(types.IDSlice(ids)) return ids } + +// PushMembershipToStorage is overriding storage information about cluster's +// members, such that they fully reflect internal RaftCluster's storage. +func (c *RaftCluster) PushMembershipToStorage() { + if c.be != nil { + TrimMembershipFromBackend(c.lg, c.be) + for _, m := range c.members { + mustSaveMemberToBackend(c.lg, c.be, m) + } + } + if c.v2store != nil { + TrimMembershipFromV2Store(c.lg, c.v2store) + for _, m := range c.members { + mustSaveMemberToStore(c.lg, c.v2store, m) + } + } +} diff --git a/server/etcdserver/api/membership/member.go b/server/etcdserver/api/membership/member.go index efecf9acc2f..696549d2d26 100644 --- a/server/etcdserver/api/membership/member.go +++ b/server/etcdserver/api/membership/member.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "sort" + "strings" "time" "go.etcd.io/etcd/client/pkg/v3/types" @@ -61,12 +62,10 @@ func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, no } func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID { - var b []byte peerURLstrs := peerURLs.StringSlice() sort.Strings(peerURLstrs) - for _, p := range peerURLstrs { - b = append(b, []byte(p)...) - } + joinedPeerUrls := strings.Join(peerURLstrs, "") + b := []byte(joinedPeerUrls) b = append(b, []byte(clusterName)...) if now != nil { diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index c4b5a2d604c..96b20cb0e69 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -57,6 +57,8 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx.UnsafePut(membersBucketName, mkey, mvalue) } +// TrimClusterFromBackend removes all information about cluster (versions) +// from the v3 backend. func TrimClusterFromBackend(be backend.Backend) error { tx := be.BatchTx() tx.Lock() @@ -83,7 +85,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M tx.RLock() defer tx.RUnlock() err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { - memberId := MustParseMemberIDFromBytes(lg, k) + memberId := mustParseMemberIDFromBytes(lg, k) m := &Member{ID: memberId} if err := json.Unmarshal(v, &m); err != nil { return err @@ -96,7 +98,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M } err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { - memberId := MustParseMemberIDFromBytes(lg, k) + memberId := mustParseMemberIDFromBytes(lg, k) removed[memberId] = true return nil }) @@ -114,24 +116,48 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I return members, removed } +// TrimMembershipFromBackend removes all information about members & +// removed_members from the v3 backend. func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { + lg.Info("Trimming membership information from the backend...") tx := be.BatchTx() tx.Lock() defer tx.Unlock() err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { tx.UnsafeDelete(membersBucketName, k) + lg.Debug("Removed member from the backend", + zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) return nil }) if err != nil { return err } - err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { + return tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { tx.UnsafeDelete(membersRemovedBucketName, k) + lg.Debug("Removed removed_member from the backend", + zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) return nil }) - if err != nil { - return err +} + +// TrimMembershipFromV2Store removes all information about members & +// removed_members from the v2 store. +func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { + members, removed := membersFromStore(lg, s) + + for mID := range members { + _, err := s.Delete(MemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + for mID := range removed { + _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) + if err != nil { + return err + } } + return nil } @@ -289,7 +315,7 @@ func MemberAttributesStorePath(id types.ID) string { return path.Join(MemberStoreKey(id), attributesSuffix) } -func MustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { +func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { id, err := types.IDFromString(string(key)) if err != nil { lg.Panic("failed to parse member id from key", zap.Error(err))