diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 3b30d813a88..6004ee8c721 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -15,7 +15,6 @@ package command import ( - "encoding/binary" "log" "os" "path" @@ -25,11 +24,15 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" + "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -54,11 +57,41 @@ func NewBackupCommand() cli.Command { } } +type desiredCluster struct { + clusterId types.ID + nodeId types.ID + members []*membership.Member + confState raftpb.ConfState +} + +func newDesiredCluster() desiredCluster { + idgen := idutil.NewGenerator(0, time.Now()) + nodeID := idgen.Next() + clusterID := idgen.Next() + + return desiredCluster{ + clusterId: types.ID(clusterID), + nodeId: types.ID(nodeID), + members: []*membership.Member{ + { + ID: types.ID(nodeID), + Attributes: membership.Attributes{ + Name: "etcdctl-v2-backup", + }, + RaftAttributes: membership.RaftAttributes{ + PeerURLs: []string{"http://use-flag--force-new-cluster:2080"}, + }}}, + confState: raftpb.ConfState{Voters: []uint64{nodeID}}, + } +} + // handleBackup handles a request that intends to do a backup. func handleBackup(c *cli.Context) error { var srcWAL string var destWAL string + lg := zap.NewExample() + withV3 := c.Bool("with-v3") srcSnap := filepath.Join(c.String("data-dir"), "member", "snap") destSnap := filepath.Join(c.String("backup-dir"), "member", "snap") @@ -79,13 +112,12 @@ func handleBackup(c *cli.Context) error { log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err) } - walsnap := saveSnap(destSnap, srcSnap) - metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) - saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3) + desired := newDesiredCluster() - idgen := idutil.NewGenerator(0, time.Now()) - metadata.NodeID = idgen.Next() - metadata.ClusterID = idgen.Next() + walsnap := saveSnap(lg, destSnap, srcSnap, &desired) + metadata, state, ents := loadWAL(srcWAL, walsnap, withV3) + destDbPath := filepath.Join(destSnap, "db") + saveDB(lg, destDbPath, filepath.Join(srcSnap, "db"), state.Commit, &desired, withV3) neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -102,15 +134,17 @@ func handleBackup(c *cli.Context) error { return nil } -func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { - ss := snap.New(zap.NewExample(), srcSnap) +func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) { + ss := snap.New(lg, srcSnap) snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { log.Fatal(err) } if snapshot != nil { - walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState - newss := snap.New(zap.NewExample(), destSnap) + walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState + newss := snap.New(lg, destSnap) + snapshot.Metadata.ConfState = desired.confState + snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired) if err = newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) } @@ -118,6 +152,26 @@ func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) { return walsnap } +// mustTranslateV2store processes storeData such that they match 'desiredCluster'. +// In particular the method overrides membership information. +func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredCluster) []byte { + st := v2store.New() + if err := st.Recovery(storeData); err != nil { + lg.Panic("cannot translate v2store", zap.Error(err)) + } + + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetStore(st) + raftCluster.PushMembershipToStorage() + + outputData, err := st.Save() + if err != nil { + lg.Panic("cannot save v2store", zap.Error(err)) + } + return outputData +} + func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap) if err != nil { @@ -190,7 +244,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad } // saveDB copies the v3 backend and strips cluster information. -func saveDB(destDB, srcDB string, idx uint64, v3 bool) { +func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { + // open src db to safely copy db state if v3 { var src *bolt.DB @@ -229,35 +284,26 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) { } } - db, err := bolt.Open(destDB, 0644, &bolt.Options{}) - if err != nil { - log.Fatal(err) - } - tx, err := db.Begin(true) - if err != nil { + be := backend.NewDefaultBackend(destDB) + defer be.Close() + + if err := membership.TrimClusterFromBackend(be); err != nil { log.Fatal(err) } - // remove membership information; should be clobbered by --force-new-cluster - for _, bucket := range []string{"members", "members_removed", "cluster"} { - tx.DeleteBucket([]byte(bucket)) - } + raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) + raftCluster.SetID(desired.nodeId, desired.clusterId) + raftCluster.SetBackend(be) + raftCluster.PushMembershipToStorage() - // update consistent index to match hard state if !v3 { - idxBytes := make([]byte, 8) - binary.BigEndian.PutUint64(idxBytes, idx) - b, err := tx.CreateBucketIfNotExists([]byte("meta")) - if err != nil { - log.Fatal(err) - } - b.Put([]byte("consistent_index"), idxBytes) + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket([]byte("meta")) + ci := cindex.NewConsistentIndex(tx) + ci.SetConsistentIndex(idx) + ci.UnsafeSave(tx) } - if err := tx.Commit(); err != nil { - log.Fatal(err) - } - if err := db.Close(); err != nil { - log.Fatal(err) - } } diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index 74245483c5f..33e87c09eac 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -128,7 +128,7 @@ func prepareBackend() backend.Backend { func rebuildStoreV2() (v2store.Store, uint64) { var index uint64 - cl := membership.NewCluster(zap.NewExample(), "") + cl := membership.NewCluster(zap.NewExample()) waldir := migrateWALdir if len(waldir) == 0 { diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 7c8b159f897..98046954334 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -21,7 +21,6 @@ import ( "fmt" "hash/crc32" "io" - "math" "os" "path/filepath" "reflect" @@ -33,7 +32,6 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/snapshot" - "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/config" @@ -42,8 +40,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" - "go.etcd.io/etcd/server/v3/lease" - "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -81,11 +77,11 @@ func NewV3(lg *zap.Logger) Manager { type v3Manager struct { lg *zap.Logger - name string - dbPath string - walDir string - snapDir string - cl *membership.RaftCluster + name string + srcDbPath string + walDir string + snapDir string + cl *membership.RaftCluster skipHashCheck bool } @@ -246,27 +242,35 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { } s.name = cfg.Name - s.dbPath = cfg.SnapshotPath + s.srcDbPath = cfg.SnapshotPath s.walDir = walDir s.snapDir = filepath.Join(dataDir, "member", "snap") s.skipHashCheck = cfg.SkipHashCheck s.lg.Info( "restoring snapshot", - zap.String("path", s.dbPath), + zap.String("path", s.srcDbPath), zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), + zap.Stack("stack"), ) + if err = s.saveDB(); err != nil { return err } - if err = s.saveWALAndSnap(); err != nil { + hardstate, err := s.saveWALAndSnap() + if err != nil { + return err + } + + if err := s.updateCIndex(hardstate.Commit); err != nil { return err } + s.lg.Info( "restored snapshot", - zap.String("path", s.dbPath), + zap.String("path", s.srcDbPath), zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), @@ -275,23 +279,44 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { return nil } +func (s *v3Manager) outDbPath() string { + return filepath.Join(s.snapDir, "db") +} + // saveDB copies the database snapshot to the snapshot directory func (s *v3Manager) saveDB() error { - f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + err := s.copyAndVerifyDB() + if err != nil { + return err + } + + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + + err = membership.TrimMembershipFromBackend(s.lg, be) + if err != nil { + return err + } + + return nil +} + +func (s *v3Manager) copyAndVerifyDB() error { + srcf, ferr := os.Open(s.srcDbPath) if ferr != nil { return ferr } - defer f.Close() + defer srcf.Close() // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + if _, err := srcf.Seek(-sha256.Size, io.SeekEnd); err != nil { return err } sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { + if _, err := srcf.Read(sha); err != nil { return err } - if _, err := f.Seek(0, io.SeekStart); err != nil { + if _, err := srcf.Seek(0, io.SeekStart); err != nil { return err } @@ -299,8 +324,9 @@ func (s *v3Manager) saveDB() error { return err } - dbpath := filepath.Join(s.snapDir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + outDbPath := s.outDbPath() + + db, dberr := os.OpenFile(outDbPath, os.O_RDWR|os.O_CREATE, 0600) if dberr != nil { return dberr } @@ -311,7 +337,7 @@ func (s *v3Manager) saveDB() error { dbClosed = true } }() - if _, err := io.Copy(db, f); err != nil { + if _, err := io.Copy(db, srcf); err != nil { return err } @@ -348,55 +374,23 @@ func (s *v3Manager) saveDB() error { // db hash is OK, can now modify DB so it can be part of a new cluster db.Close() - dbClosed = true - - commit := len(s.cl.Members()) - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - defer be.Close() - - ci := cindex.NewConsistentIndex(be.BatchTx()) - ci.SetConsistentIndex(uint64(commit)) - - // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) - defer lessor.Stop() - - mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) - defer mvs.Close() - txn := mvs.Write(traceutil.TODO()) - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - - // trigger write-out of new consistent index - txn.End() - - mvs.Commit() return nil } // saveWALAndSnap creates a WAL for the initial cluster // // TODO: This code ignores learners !!! -func (s *v3Manager) saveWALAndSnap() error { +func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { if err := fileutil.CreateDirAll(s.walDir); err != nil { - return err + return nil, err } // add members again to persist them to the store we create. st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) s.cl.SetStore(st) + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + s.cl.SetBackend(be) for _, m := range s.cl.Members() { s.cl.AddMember(m, true) } @@ -405,11 +399,11 @@ func (s *v3Manager) saveWALAndSnap() error { md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} metadata, merr := md.Marshal() if merr != nil { - return merr + return nil, merr } w, walerr := wal.Create(s.lg, s.walDir, metadata) if walerr != nil { - return walerr + return nil, walerr } defer w.Close() @@ -417,7 +411,7 @@ func (s *v3Manager) saveWALAndSnap() error { for i, id := range s.cl.MemberIDs() { ctx, err := json.Marshal((*s.cl).Member(id)) if err != nil { - return err + return nil, err } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } @@ -433,7 +427,7 @@ func (s *v3Manager) saveWALAndSnap() error { } d, err := cc.Marshal() if err != nil { - return err + return nil, err } ents[i] = raftpb.Entry{ Type: raftpb.EntryConfChange, @@ -444,17 +438,18 @@ func (s *v3Manager) saveWALAndSnap() error { } commit, term := uint64(len(ents)), uint64(1) - if err := w.Save(raftpb.HardState{ + hardState := raftpb.HardState{ Term: term, Vote: peers[0].ID, Commit: commit, - }, ents); err != nil { - return err + } + if err := w.Save(hardState, ents); err != nil { + return nil, err } b, berr := st.Save() if berr != nil { - return berr + return nil, berr } confState := raftpb.ConfState{ Voters: nodeIDs, @@ -469,7 +464,17 @@ func (s *v3Manager) saveWALAndSnap() error { } sn := snap.New(s.lg, s.snapDir) if err := sn.SaveSnap(raftSnap); err != nil { - return err + return nil, err } - return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}) + snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState} + return &hardState, w.SaveSnapshot(snapshot) +} + +func (s *v3Manager) updateCIndex(commit uint64) error { + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + ci := cindex.NewConsistentIndex(be.BatchTx()) + ci.SetConsistentIndex(commit) + ci.UnsafeSave(be.BatchTx()) + return nil } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index a8a27984f4a..c335c7db240 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -48,7 +48,6 @@ type RaftCluster struct { localID types.ID cid types.ID - token string v2store v2store.Store be backend.Backend @@ -82,7 +81,7 @@ const ( // NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating // cluster with raft learner member. func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) { - c := NewCluster(lg, token) + c := NewCluster(lg) for name, urls := range urlsmap { m := NewMember(name, urls, token, nil) if _, ok := c.members[m.ID]; ok { @@ -97,8 +96,8 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) return c, nil } -func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster { - c := NewCluster(lg, token) +func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member) *RaftCluster { + c := NewCluster(lg) c.cid = id for _, m := range membs { c.members[m.ID] = m @@ -106,13 +105,12 @@ func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*M return c } -func NewCluster(lg *zap.Logger, token string) *RaftCluster { +func NewCluster(lg *zap.Logger) *RaftCluster { if lg == nil { lg = zap.NewNop() } return &RaftCluster{ lg: lg, - token: token, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), downgradeInfo: &DowngradeInfo{Enabled: false}, @@ -676,6 +674,10 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m return members, removed } +func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { + return mustReadMembersFromBackend(lg, be) +} + func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { e, err := st.Get(path.Join(storePrefix, "version"), false, false) if err != nil { @@ -857,3 +859,20 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID { sort.Sort(types.IDSlice(ids)) return ids } + +// PushMembershipToStorage is overriding storage information about cluster's +// members, such that they fully reflect internal RaftCluster's storage. +func (c *RaftCluster) PushMembershipToStorage() { + if c.be != nil { + TrimMembershipFromBackend(c.lg, c.be) + for _, m := range c.members { + mustSaveMemberToBackend(c.lg, c.be, m) + } + } + if c.v2store != nil { + TrimMembershipFromV2Store(c.lg, c.v2store) + for _, m := range c.members { + mustSaveMemberToStore(c.lg, c.v2store, m) + } + } +} diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index c3975df25b0..23d81fec1ca 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -279,7 +279,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) { } func TestClusterValidateConfigurationChange(t *testing.T) { - cl := NewCluster(zap.NewExample(), "") + cl := NewCluster(zaptest.NewLogger(t)) cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} diff --git a/server/etcdserver/api/membership/member.go b/server/etcdserver/api/membership/member.go index 8ab1f4d4170..696549d2d26 100644 --- a/server/etcdserver/api/membership/member.go +++ b/server/etcdserver/api/membership/member.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "sort" + "strings" "time" "go.etcd.io/etcd/client/pkg/v3/types" @@ -49,29 +50,22 @@ type Member struct { // NewMember creates a Member without an ID and generates one based on the // cluster name, peer URLs, and time. This is used for bootstrapping/adding new member. func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { - return newMember(name, peerURLs, clusterName, now, false) + memberId := computeMemberId(peerURLs, clusterName, now) + return newMember(name, peerURLs, memberId, false) } // NewMemberAsLearner creates a learner Member without an ID and generates one based on the // cluster name, peer URLs, and time. This is used for adding new learner member. func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { - return newMember(name, peerURLs, clusterName, now, true) + memberId := computeMemberId(peerURLs, clusterName, now) + return newMember(name, peerURLs, memberId, true) } -func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time, isLearner bool) *Member { - m := &Member{ - RaftAttributes: RaftAttributes{ - PeerURLs: peerURLs.StringSlice(), - IsLearner: isLearner, - }, - Attributes: Attributes{Name: name}, - } - - var b []byte - sort.Strings(m.PeerURLs) - for _, p := range m.PeerURLs { - b = append(b, []byte(p)...) - } +func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID { + peerURLstrs := peerURLs.StringSlice() + sort.Strings(peerURLstrs) + joinedPeerUrls := strings.Join(peerURLstrs, "") + b := []byte(joinedPeerUrls) b = append(b, []byte(clusterName)...) if now != nil { @@ -79,7 +73,18 @@ func newMember(name string, peerURLs types.URLs, clusterName string, now *time.T } hash := sha1.Sum(b) - m.ID = types.ID(binary.BigEndian.Uint64(hash[:8])) + return types.ID(binary.BigEndian.Uint64(hash[:8])) +} + +func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member { + m := &Member{ + RaftAttributes: RaftAttributes{ + PeerURLs: peerURLs.StringSlice(), + IsLearner: isLearner, + }, + Attributes: Attributes{Name: name}, + ID: memberId, + } return m } diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index c3593394c82..96b20cb0e69 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -57,6 +57,16 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx.UnsafePut(membersBucketName, mkey, mvalue) } +// TrimClusterFromBackend removes all information about cluster (versions) +// from the v3 backend. +func TrimClusterFromBackend(be backend.Backend) error { + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeDeleteBucket(clusterBucketName) + return nil +} + func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { mkey := backendMemberKey(id) @@ -67,6 +77,90 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed")) } +func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { + members := make(map[types.ID]*Member) + removed := make(map[types.ID]bool) + + tx := be.ReadTx() + tx.RLock() + defer tx.RUnlock() + err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { + memberId := mustParseMemberIDFromBytes(lg, k) + m := &Member{ID: memberId} + if err := json.Unmarshal(v, &m); err != nil { + return err + } + members[memberId] = m + return nil + }) + if err != nil { + return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err) + } + + err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { + memberId := mustParseMemberIDFromBytes(lg, k) + removed[memberId] = true + return nil + }) + if err != nil { + return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err) + } + return members, removed, nil +} + +func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { + members, removed, err := readMembersFromBackend(lg, be) + if err != nil { + lg.Panic("couldn't read members from backend", zap.Error(err)) + } + return members, removed +} + +// TrimMembershipFromBackend removes all information about members & +// removed_members from the v3 backend. +func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { + lg.Info("Trimming membership information from the backend...") + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { + tx.UnsafeDelete(membersBucketName, k) + lg.Debug("Removed member from the backend", + zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) + return nil + }) + if err != nil { + return err + } + return tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { + tx.UnsafeDelete(membersRemovedBucketName, k) + lg.Debug("Removed removed_member from the backend", + zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) + return nil + }) +} + +// TrimMembershipFromV2Store removes all information about members & +// removed_members from the v2 store. +func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { + members, removed := membersFromStore(lg, s) + + for mID := range members { + _, err := s.Delete(MemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + for mID := range removed { + _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + + return nil +} + func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() @@ -221,10 +315,18 @@ func MemberAttributesStorePath(id types.ID) string { return path.Join(MemberStoreKey(id), attributesSuffix) } +func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { + id, err := types.IDFromString(string(key)) + if err != nil { + lg.Panic("failed to parse member id from key", zap.Error(err)) + } + return id +} + func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { id, err := types.IDFromString(path.Base(key)) if err != nil { - lg.Panic("failed to parse memver id from key", zap.Error(err)) + lg.Panic("failed to parse member id from key", zap.Error(err)) } return id } diff --git a/server/etcdserver/api/membership/store_test.go b/server/etcdserver/api/membership/store_test.go new file mode 100644 index 00000000000..d39a2b1034d --- /dev/null +++ b/server/etcdserver/api/membership/store_test.go @@ -0,0 +1,43 @@ +package membership + +import ( + "testing" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/client/pkg/v3/types" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.uber.org/zap" +) + +func TestAddRemoveMember(t *testing.T) { + c := newTestCluster(t, nil) + be, bepath := betesting.NewDefaultTmpBackend(t) + c.SetBackend(be) + c.AddMember(newTestMember(17, nil, "node17", nil), true) + c.RemoveMember(17, true) + c.AddMember(newTestMember(18, nil, "node18", nil), true) + + // Skipping removal of already removed member + c.RemoveMember(17, true) + err := be.Close() + assert.NoError(t, err) + + be2 := backend.NewDefaultBackend(bepath) + defer func() { + assert.NoError(t, be2.Close()) + }() + + if false { + // TODO: Enable this code when Recover is reading membership from the backend. + c2 := newTestCluster(t, nil) + c2.SetBackend(be2) + c2.Recover(func(*zap.Logger, *semver.Version) {}) + assert.Equal(t, []*Member{{ID: types.ID(18), + Attributes: Attributes{Name: "node18"}}}, c2.Members()) + assert.Equal(t, true, c2.IsIDRemoved(17)) + assert.Equal(t, false, c2.IsIDRemoved(18)) + } +} diff --git a/server/etcdserver/cluster_util.go b/server/etcdserver/cluster_util.go index 8dd8f205d2f..595586e2012 100644 --- a/server/etcdserver/cluster_util.go +++ b/server/etcdserver/cluster_util.go @@ -113,7 +113,7 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat // if membership members are not present then the raft cluster formed will be // an invalid empty cluster hence return failed to get raft cluster member(s) from the given urls error if len(membs) > 0 { - return membership.NewClusterFromMembers(lg, "", id, membs), nil + return membership.NewClusterFromMembers(lg, id, membs), nil } return nil, fmt.Errorf("failed to get raft cluster member(s) from the given URLs") } diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 664894cbc54..bb248eb3635 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -487,7 +487,7 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, zap.String("local-member-id", id.String()), zap.Uint64("commit-index", st.Commit), ) - cl := membership.NewCluster(cfg.Logger, "") + cl := membership.NewCluster(cfg.Logger) cl.SetID(id, cid) s := raft.NewMemoryStorage() if snapshot != nil { @@ -565,7 +565,7 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) zap.Uint64("commit-index", st.Commit), ) - cl := membership.NewCluster(cfg.Logger, "") + cl := membership.NewCluster(cfg.Logger) cl.SetID(id, cid) s := raft.NewMemoryStorage() if snapshot != nil { diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 871b15978db..3eb5345dc25 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -230,7 +230,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { func TestProcessDuplicatedAppRespMessage(t *testing.T) { n := newNopReadyNode() - cl := membership.NewCluster(zap.NewExample(), "abc") + cl := membership.NewCluster(zap.NewExample()) rs := raft.NewMemoryStorage() p := mockstorage.NewStorageRecorder("") diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index b50b0362556..50e8533a97e 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -178,7 +178,7 @@ func TestApplyRepeat(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{RaftState: raft.StateLeader}, } - cl := newTestCluster(nil) + cl := newTestCluster(t, nil) st := v2store.New() cl.SetStore(v2store.New()) cl.AddMember(&membership.Member{ID: 1234}, true) @@ -483,7 +483,7 @@ func TestApplyRequest(t *testing.T) { } func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { - cl := newTestCluster([]*membership.Member{{ID: 1}}) + cl := newTestCluster(t, []*membership.Member{{ID: 1}}) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), @@ -506,7 +506,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { } func TestApplyConfChangeError(t *testing.T) { - cl := membership.NewCluster(zap.NewExample(), "") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}, true) @@ -594,7 +594,7 @@ func TestApplyConfChangeError(t *testing.T) { } func TestApplyConfChangeShouldStop(t *testing.T) { - cl := membership.NewCluster(zap.NewExample(), "") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(v2store.New()) for i := 1; i <= 3; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}, true) @@ -638,7 +638,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex // where consistIndex equals to applied index. func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { - cl := membership.NewCluster(zap.NewExample(), "") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(v2store.New()) cl.AddMember(&membership.Member{ID: types.ID(1)}, true) r := newRaftNode(raftNodeConfig{ @@ -685,7 +685,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop // if the local member is removed along with other conf updates. func TestApplyMultiConfChangeShouldStop(t *testing.T) { - cl := membership.NewCluster(zap.NewExample(), "") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(v2store.New()) for i := 1; i <= 5; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}, true) @@ -1038,7 +1038,7 @@ func TestSnapshot(t *testing.T) { func TestSnapshotOrdering(t *testing.T) { n := newNopReadyNode() st := v2store.New() - cl := membership.NewCluster(zap.NewExample(), "abc") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(st) testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") @@ -1192,7 +1192,7 @@ func TestTriggerSnap(t *testing.T) { func TestConcurrentApplyAndSnapshotV3(t *testing.T) { n := newNopReadyNode() st := v2store.New() - cl := membership.NewCluster(zap.NewExample(), "abc") + cl := membership.NewCluster(zaptest.NewLogger(t)) cl.SetStore(st) testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") @@ -1292,7 +1292,7 @@ func TestAddMember(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{RaftState: raft.StateLeader}, } - cl := newTestCluster(nil) + cl := newTestCluster(t, nil) st := v2store.New() cl.SetStore(st) r := newRaftNode(raftNodeConfig{ @@ -1336,7 +1336,7 @@ func TestRemoveMember(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{RaftState: raft.StateLeader}, } - cl := newTestCluster(nil) + cl := newTestCluster(t, nil) st := v2store.New() cl.SetStore(v2store.New()) cl.AddMember(&membership.Member{ID: 1234}, true) @@ -1380,7 +1380,7 @@ func TestUpdateMember(t *testing.T) { n.readyc <- raft.Ready{ SoftState: &raft.SoftState{RaftState: raft.StateLeader}, } - cl := newTestCluster(nil) + cl := newTestCluster(t, nil) st := v2store.New() cl.SetStore(st) cl.AddMember(&membership.Member{ID: 1234}, true) @@ -1689,6 +1689,7 @@ func TestStopNotify(t *testing.T) { } func TestGetOtherPeerURLs(t *testing.T) { + lg := zaptest.NewLogger(t) tests := []struct { membs []*membership.Member wurls []string @@ -1717,7 +1718,7 @@ func TestGetOtherPeerURLs(t *testing.T) { }, } for i, tt := range tests { - cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs) + cl := membership.NewClusterFromMembers(lg, types.ID(0), tt.membs) self := "1" urls := getRemotePeerURLs(cl, self) if !reflect.DeepEqual(urls, tt.wurls) { @@ -1868,8 +1869,8 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { return nil } -func newTestCluster(membs []*membership.Member) *membership.RaftCluster { - c := membership.NewCluster(zap.NewExample(), "") +func newTestCluster(t testing.TB, membs []*membership.Member) *membership.RaftCluster { + c := membership.NewCluster(zaptest.NewLogger(t)) for _, m := range membs { c.AddMember(m, true) } diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index eb75c29fc92..d4bc8c68428 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -28,6 +28,7 @@ import ( type BatchTx interface { ReadTx UnsafeCreateBucket(name []byte) + UnsafeDeleteBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeDelete(bucketName []byte, key []byte) @@ -80,6 +81,18 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) { t.pending++ } +func (t *batchTx) UnsafeDeleteBucket(name []byte) { + err := t.tx.DeleteBucket(name) + if err != nil && err != bolt.ErrBucketNotFound { + t.backend.lg.Fatal( + "failed to delete a bucket", + zap.String("bucket-name", string(name)), + zap.Error(err), + ) + } + t.pending++ +} + // UnsafePut must be called holding the lock on the tx. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { t.unsafePut(bucketName, key, value, false) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 1bb3fae24e3..10fa500535b 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -875,6 +875,7 @@ func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} func (b *fakeBatchTx) RUnlock() {} func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} +func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {} func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) } diff --git a/tests/e2e/ctl_v2_test.go b/tests/e2e/ctl_v2_test.go index 3de007fec46..e8c1131177d 100644 --- a/tests/e2e/ctl_v2_test.go +++ b/tests/e2e/ctl_v2_test.go @@ -222,15 +222,15 @@ func TestCtlV2BackupV3Snapshot(t *testing.T) { testCtlV2Backup(t, 1, true) } func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { BeforeTestV2(t) - backupDir, err := ioutil.TempDir("", "testbackup0.etcd") + backupDir, err := ioutil.TempDir(t.TempDir(), "testbackup0.etcd") if err != nil { t.Fatal(err) } - defer os.RemoveAll(backupDir) etcdCfg := newConfigNoTLS() etcdCfg.snapshotCount = snapCount etcdCfg.enableV2 = true + t.Log("Starting etcd-1") epc1 := setupEtcdctlTest(t, etcdCfg, false) // v3 put before v2 set so snapshot happens after v3 operations to confirm @@ -241,23 +241,30 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { } os.Setenv("ETCDCTL_API", "2") + t.Log("Setting key in etcd-1") if err := etcdctlSet(epc1, "foo1", "bar1"); err != nil { t.Fatal(err) } if v3 { + t.Log("Stopping etcd-1") // v3 must lock the db to backup, so stop process if err := epc1.Stop(); err != nil { t.Fatal(err) } } - if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir, v3); err != nil { + t.Log("Triggering etcd backup") + if err := etcdctlBackup(t, epc1, epc1.procs[0].Config().dataDirPath, backupDir, v3); err != nil { t.Fatal(err) } + t.Log("Closing etcd-1 backup") if err := epc1.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) } + t.Logf("Backup directory: %s", backupDir) + + t.Log("Starting etcd-2 (post backup)") // restart from the backup directory cfg2 := newConfigNoTLS() cfg2.dataDirPath = backupDir @@ -268,6 +275,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { // Make sure a failing test is not leaking resources (running server). defer epc2.Close() + t.Log("Getting examplar key") // check if backup went through correctly if err := etcdctlGet(epc2, "foo1", "bar1", false); err != nil { t.Fatal(err) @@ -276,6 +284,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { os.Setenv("ETCDCTL_API", "3") ctx2 := ctlCtx{t: t, epc: epc2} if v3 { + t.Log("Getting v3 examplar key") if err := ctlV3Get(ctx2, []string{"v3key"}, kv{"v3key", "123"}); err != nil { t.Fatal(err) } @@ -286,6 +295,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { } os.Setenv("ETCDCTL_API", "2") + t.Log("Getting examplar key foo2") // check if it can serve client requests if err := etcdctlSet(epc2, "foo2", "bar2"); err != nil { t.Fatal(err) @@ -294,6 +304,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { t.Fatal(err) } + t.Log("Closing etcd-2") if err := epc2.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) } @@ -472,11 +483,12 @@ func etcdctlAuthEnable(clus *etcdProcessCluster) error { return spawnWithExpect(cmdArgs, "Authentication Enabled") } -func etcdctlBackup(clus *etcdProcessCluster, dataDir, backupDir string, v3 bool) error { +func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir string, v3 bool) error { cmdArgs := append(etcdctlPrefixArgs(clus), "backup", "--data-dir", dataDir, "--backup-dir", backupDir) if v3 { cmdArgs = append(cmdArgs, "--with-v3") } + t.Logf("Running: %v", cmdArgs) proc, err := spawnCmd(cmdArgs) if err != nil { return err diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index 7484d669b02..55f3494eb9a 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -24,7 +24,7 @@ import ( ) var ( - etcdServerReadyLines = []string{"enabled capabilities for version", "published", "ready to serve client requests"} + etcdServerReadyLines = []string{"ready to serve client requests"} binPath string ctlBinPath string ) diff --git a/tests/e2e/etcd_release_upgrade_test.go b/tests/e2e/etcd_release_upgrade_test.go index eaa487cb44d..78caef96fac 100644 --- a/tests/e2e/etcd_release_upgrade_test.go +++ b/tests/e2e/etcd_release_upgrade_test.go @@ -49,20 +49,6 @@ func TestReleaseUpgrade(t *testing.T) { t.Fatalf("error closing etcd processes (%v)", errC) } }() - // 3.0 boots as 2.3 then negotiates up to 3.0 - // so there's a window at boot time where it doesn't have V3rpcCapability enabled - // poll /version until etcdcluster is >2.3.x before making v3 requests - for i := 0; i < 7; i++ { - if err = cURLGet(epc, cURLReq{endpoint: "/version", expected: `"etcdcluster":"3.`}); err != nil { - t.Logf("#%d: v3 is not ready yet (%v)", i, err) - time.Sleep(time.Second) - continue - } - break - } - if err != nil { - t.Fatalf("cannot pull version (%v)", err) - } os.Setenv("ETCDCTL_API", "3") defer os.Unsetenv("ETCDCTL_API") @@ -83,24 +69,32 @@ func TestReleaseUpgrade(t *testing.T) { } } + t.Log("Cluster of etcd in old version running") + for i := range epc.procs { + t.Logf("Stopping node: %v", i) if err := epc.procs[i].Stop(); err != nil { t.Fatalf("#%d: error closing etcd process (%v)", i, err) } + t.Logf("Stopped node: %v", i) epc.procs[i].Config().execPath = binDir + "/etcd" epc.procs[i].Config().keepDataDir = true + t.Logf("Restarting node in the new version: %v", i) if err := epc.procs[i].Restart(); err != nil { t.Fatalf("error restarting etcd process (%v)", err) } + t.Logf("Testing reads after node restarts: %v", i) for j := range kvs { if err := ctlV3Get(cx, []string{kvs[j].key}, []kv{kvs[j]}...); err != nil { cx.t.Fatalf("#%d-%d: ctlV3Get error (%v)", i, j, err) } } + t.Logf("Tested reads after node restarts: %v", i) } + t.Log("Waiting for full upgrade...") // TODO: update after release candidate // expect upgraded cluster version // new cluster version needs more time to upgrade @@ -116,6 +110,7 @@ func TestReleaseUpgrade(t *testing.T) { if err != nil { t.Fatalf("cluster version is not upgraded (%v)", err) } + t.Log("TestReleaseUpgrade businessLogic DONE") } func TestReleaseUpgradeWithRestart(t *testing.T) { diff --git a/tests/e2e/util.go b/tests/e2e/util.go index fba768e5036..2841d94fb05 100644 --- a/tests/e2e/util.go +++ b/tests/e2e/util.go @@ -27,15 +27,13 @@ import ( ) func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error { - c := 0 matchSet := func(l string) bool { for _, s := range readyStrs { if strings.Contains(l, s) { - c++ - break + return true } } - return c == len(readyStrs) + return false } _, err := exproc.ExpectFunc(matchSet) return err diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index c609a60fbed..b5635634430 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -18,6 +18,7 @@ import ( "bufio" "bytes" "encoding/hex" + "encoding/json" "flag" "fmt" "io" @@ -88,8 +89,12 @@ and output a hex encoded line of binary for each input line`) case nil: walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term nodes := genIDSlice(snapshot.Metadata.ConfState.Voters) - fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n", - walsnap.Term, walsnap.Index, nodes) + confstateJson, err := json.Marshal(snapshot.Metadata.ConfState) + if err != nil { + confstateJson = []byte(fmt.Sprintf("confstate err: %v", err)) + } + fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s confstate=%s\n", + walsnap.Term, walsnap.Index, nodes, confstateJson) case snap.ErrNoSnapshot: fmt.Printf("Snapshot:\nempty\n") default: