From 962d0e30702702c41d0b2048201b65ea6603ec0f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 2 Apr 2021 21:08:03 +0200 Subject: [PATCH] v2 etcdctl backup: producing consistent state of membership --- etcdctl/ctlv2/command/backup_command.go | 123 ++++++++++++++------ server/etcdserver/api/membership/cluster.go | 17 +++ server/etcdserver/api/membership/store.go | 31 ++++- 3 files changed, 130 insertions(+), 41 deletions(-) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index a662e1c47cfa..795c9f2b21a9 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -15,7 +15,6 @@ package command import ( - "encoding/binary" "log" "os" "path" @@ -27,9 +26,13 @@ import ( "go.etcd.io/etcd/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/pkg/v3/types" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" + "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -54,11 +57,41 @@ func NewBackupCommand() cli.Command { } } +type desiredCluster struct { + clusterId types.ID + nodeId types.ID + members []*membership.Member + confState raftpb.ConfState +} + +func newDesiredCluster() desiredCluster { + idgen := idutil.NewGenerator(0, time.Now()) + nodeID := idgen.Next() + clusterID := idgen.Next() + + return desiredCluster{ + clusterId: types.ID(clusterID), + nodeId: types.ID(nodeID), + members: []*membership.Member{ + { + ID: types.ID(nodeID), + Attributes: membership.Attributes{ + Name: "etcdctl-v2-backup", + }, + RaftAttributes: membership.RaftAttributes{ + PeerURLs: []string{"http://use-flag--force-new-cluster:2080"}, + }}}, + confState: raftpb.ConfState{Voters: []uint64{nodeID}}, + } +} + // handleBackup handles a request that intends to do a backup. func handleBackup(c *cli.Context) error { var srcWAL string var destWAL string + lg := zap.NewExample() + withV3 := c.Bool("with-v3") srcSnap := filepath.Join(c.String("data-dir"), "member", "snap") destSnap := filepath.Join(c.String("backup-dir"), "member", "snap") @@ -79,13 +112,12 @@ func handleBackup(c *cli.Context) error { log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err) } - walsnap := saveSnap(destSnap, srcSnap) - metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) - saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3) + desired := newDesiredCluster() - idgen := idutil.NewGenerator(0, time.Now()) - metadata.NodeID = idgen.Next() - metadata.ClusterID = idgen.Next() + walsnap := saveSnap(lg, destSnap, srcSnap, &desired) + metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) + destDbPath := filepath.Join(destSnap, "db") + saveDB(lg, destDbPath, filepath.Join(srcSnap, "db"), state.Commit, &desired, withV3) neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -102,15 +134,17 @@ func handleBackup(c *cli.Context) error { return nil } -func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { - ss := snap.New(zap.NewExample(), srcSnap) +func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) { + ss := snap.New(lg, srcSnap) snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { log.Fatal(err) } if snapshot != nil { - walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState - newss := snap.New(zap.NewExample(), destSnap) + walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState + newss := snap.New(lg, destSnap) + snapshot.Metadata.ConfState = desired.confState + snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired) if err = newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) } @@ -118,6 +152,26 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { return walsnap } +// mustTranslateV2store processes storeData such that they match 'desiredCluster'. +// In particular the method overrides membership information. +func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredCluster) []byte { + st := v2store.New() + if err := st.Recovery(storeData); err != nil { + lg.Panic("cannot translate v2store", zap.Error(err)) + } + + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetStore(st) + raftCluster.PushMembershipToStorage() + + outputData, err := st.Save() + if err != nil { + lg.Panic("cannot save v2store", zap.Error(err)) + } + return outputData +} + func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap) if err != nil { @@ -145,6 +199,7 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad i-- } for i = 0; i < len(ents); i++ { + log.Printf("PROCESSING: %v\n\n", ents[i]) ents[i].Index -= removed if ents[i].Type == raftpb.EntryConfChange { log.Println("ignoring EntryConfChange raft entry") @@ -161,6 +216,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad pbutil.MustUnmarshal(v2Req, ents[i].Data) } + log.Printf("Req: %v\n\n%v\n\n", raftReq, v2Req) + if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) { log.Println("ignoring member attribute update on", v2Req.Path) remove() @@ -184,7 +241,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad } // saveDB copies the v3 backend and strips cluster information. -func saveDB(destDB, srcDB string, idx uint64, v3 bool) { +func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { + // open src db to safely copy db state if v3 { var src *bolt.DB @@ -223,37 +281,26 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) { } } - db, err := bolt.Open(destDB, 0644, &bolt.Options{}) - if err != nil { - log.Fatal(err) - } - tx, err := db.Begin(true) - if err != nil { + be := backend.NewDefaultBackend(destDB) + defer be.Close() + + if err := membership.TrimClusterFromBackend(be); err != nil { log.Fatal(err) } - // remove membership information; should be clobbered by --force-new-cluster - // TODO: Consider refactoring to use backend.Backend instead of bolt - // and membership.TrimMembershipFromBackend. - for _, bucket := range []string{"members", "members_removed", "cluster"} { - tx.DeleteBucket([]byte(bucket)) - } + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetBackend(be) + raftCluster.PushMembershipToStorage() - // update consistent index to match hard state if !v3 { - idxBytes := make([]byte, 8) - binary.BigEndian.PutUint64(idxBytes, idx) - b, err := tx.CreateBucketIfNotExists([]byte("meta")) - if err != nil { - log.Fatal(err) - } - b.Put([]byte("consistent_index"), idxBytes) + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket([]byte("meta")) + ci := cindex.NewConsistentIndex(tx) + ci.SetConsistentIndex(idx) + ci.UnsafeSave(tx) } - if err := tx.Commit(); err != nil { - log.Fatal(err) - } - if err := db.Close(); err != nil { - log.Fatal(err) - } } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 7c071d4841c4..f282dbba1797 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -854,3 +854,20 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID { sort.Sort(types.IDSlice(ids)) return ids } + +// PushMembershipToStorage is overriding storage information about cluster's +// members, such that they fully reflect internal RaftCluster's storage. +func (c *RaftCluster) PushMembershipToStorage() { + if c.be != nil { + TrimMembershipFromBackend(c.lg, c.be) + for _, m := range c.members { + mustSaveMemberToBackend(c.lg, c.be, m) + } + } + if c.v2store != nil { + TrimMembershipFromV2Store(c.lg, c.v2store) + for _, m := range c.members { + mustSaveMemberToStore(c.lg, c.v2store, m) + } + } +} diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index ff7ad29a8c9d..6b5d07b8e187 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -57,6 +57,8 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx.UnsafePut(membersBucketName, mkey, mvalue) } +// TrimClusterFromBackend removes all information about cluster (versions) +// from the v3 backend. func TrimClusterFromBackend(be backend.Backend) error { tx := be.BatchTx() tx.Lock() @@ -83,7 +85,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M tx.RLock() defer tx.RUnlock() err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { - memberId := MustParseMemberIDFromBytes(lg, k) + memberId := mustParseMemberIDFromBytes(lg, k) m := &Member{ID: memberId} if err := json.Unmarshal(v, &m); err != nil { return err @@ -96,7 +98,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M } err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { - memberId := MustParseMemberIDFromBytes(lg, k) + memberId := mustParseMemberIDFromBytes(lg, k) removed[memberId] = true return nil }) @@ -114,6 +116,8 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I return members, removed } +// TrimMembershipFromBackend removes all information about members & +// removed_members from the v3 backend. func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { tx := be.BatchTx() tx.Lock() @@ -135,6 +139,27 @@ func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { return nil } +// TrimMembershipFromV2Store removes all information about members & +// removed_members from the v2 store. +func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { + members, removed := membersFromStore(lg, s) + + for mID := range members { + _, err := s.Delete(MemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + for mID := range removed { + _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + + return nil +} + func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() @@ -289,7 +314,7 @@ func MemberAttributesStorePath(id types.ID) string { return path.Join(MemberStoreKey(id), attributesSuffix) } -func MustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { +func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { id, err := types.IDFromString(string(key)) if err != nil { lg.Panic("failed to parse member id from key", zap.Error(err))