From 7695db66169120467034e4bad43ff0f1500dd6e6 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sat, 10 Apr 2021 11:42:15 +0200 Subject: [PATCH] Applying consistency fix: ClusterVersionSet (and co) might get no applied on v2store ClusterVersionSet, ClusterMemberAttrSet, DowngradeInfoSet functions are writing both to V2store and backend. Prior this CL there were in a branch not executed if shouldApplyV3 was false, e.g. during restore when Backend is up-to-date (has high consistency-index) while v2store requires replay from WAL log. The most serious consequence of this bug was that v2store after restore could have different index (revision) than the same exact store before restore, so potentially different content between replicas. Also this change is supressing double-applying of Membership (ClusterConfig) changes on Backend (store v3) - that lackilly are not part of MVCC/KeyValue store, so they didn't caused Revisions to be bumped. Inspired by jingyih@ comment: https://github.com/etcd-io/etcd/pull/12820#issuecomment-815299406 --- etcdctl/ctlv3/command/migrate_command.go | 6 +-- etcdctl/snapshot/v3_snapshot.go | 2 +- server/etcdserver/api/membership/cluster.go | 29 +++++------ .../etcdserver/api/membership/cluster_test.go | 14 +++--- server/etcdserver/apply.go | 49 ++++++++++++------- server/etcdserver/apply_auth.go | 4 +- server/etcdserver/apply_v2.go | 2 +- server/etcdserver/server.go | 35 ++++++++----- server/etcdserver/server_test.go | 24 ++++----- server/mvcc/kvstore.go | 5 +- tests/go.mod | 1 + .../integration/clientv3/ordering_kv_test.go | 23 +++++---- 12 files changed, 112 insertions(+), 82 deletions(-) diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index c287f0329177..74245483c5f8 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -208,15 +208,15 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.AddMember(m) + cl.AddMember(m, true) case raftpb.ConfChangeRemoveNode: - cl.RemoveMember(types.ID(cc.NodeID)) + cl.RemoveMember(types.ID(cc.NodeID), true) case raftpb.ConfChangeUpdateNode: m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.UpdateRaftAttributes(m.ID, m.RaftAttributes) + cl.UpdateRaftAttributes(m.ID, m.RaftAttributes, true) } } diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 48ea0597a332..7c8b159f8977 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -398,7 +398,7 @@ func (s *v3Manager) saveWALAndSnap() error { st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) s.cl.SetStore(st) for _, m := range s.cl.Members() { - s.cl.AddMember(m) + s.cl.AddMember(m, true) } m := s.cl.MemberByName(s.name) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index ea9d58aef9a7..1d6949d59f87 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -285,6 +285,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { + // TODO: this must be switched to backend as well. members, removed := membersFromStore(c.lg, c.v2store) id := types.ID(cc.NodeID) if removed[id] { @@ -370,13 +371,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *RaftCluster) AddMember(m *Member) { +func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustSaveMemberToStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } @@ -393,13 +394,13 @@ func (c *RaftCluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *RaftCluster) RemoveMember(id types.ID) { +func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustDeleteMemberFromStore(c.lg, c.v2store, id) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustDeleteMemberFromBackend(c.be, id) } @@ -425,7 +426,7 @@ func (c *RaftCluster) RemoveMember(id types.ID) { } } -func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { +func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -434,7 +435,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { if c.v2store != nil { mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } return @@ -459,7 +460,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { } // PromoteMember marks the member's IsLearner RaftAttributes to false. -func (c *RaftCluster) PromoteMember(id types.ID) { +func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -467,7 +468,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -478,7 +479,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { ) } -func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -486,7 +487,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -508,7 +509,7 @@ func (c *RaftCluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) { +func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.version != nil { @@ -533,7 +534,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveClusterVersionToBackend(c.be, ver) } if oldVer != nil { @@ -809,11 +810,11 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { return d } -func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) { +func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) { c.Lock() defer c.Unlock() - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveDowngradeToBackend(c.lg, c.be, d) } diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index a2fff321f21a..c3975df25b0a 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -283,9 +283,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} - cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}) + cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) @@ -446,7 +446,7 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(mockstore.NewNop()) - cs.AddMember(newTestMember(3, nil, "", nil)) + cs.AddMember(newTestMember(3, nil, "", nil), true) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -489,7 +489,7 @@ func TestClusterAddMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMember(1, nil, "node1", nil)) + c.AddMember(newTestMember(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -512,7 +512,7 @@ func TestClusterAddMemberAsLearner(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil)) + c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -555,7 +555,7 @@ func TestClusterRemoveMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.RemoveMember(1) + c.RemoveMember(1, true) wactions := []testutil.Action{ {Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}}, @@ -595,7 +595,7 @@ func TestClusterUpdateAttributes(t *testing.T) { c := newTestCluster(t, tt.mems) c.removed = tt.removed - c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}) + c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}, true) if g := c.Members(); !reflect.DeepEqual(g, tt.wmems) { t.Errorf("#%d: members = %+v, want %+v", i, g, tt.wmems) } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index f80f89ca3320..2096ecbb45c6 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -54,14 +54,14 @@ type applyResult struct { // applierV3Internal is the interface for processing internal V3 raft request type applierV3Internal interface { - ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) - ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) - DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) + ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) + ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { - Apply(r *pb.InternalRaftRequest) *applyResult + Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) @@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 { ) } -func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { +func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { op := "unknown" ar := &applyResult{} defer func(start time.Time) { @@ -142,6 +142,25 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { } }(time.Now()) + switch { + case r.ClusterVersionSet != nil: // Implemented in 3.5.x + op = "ClusterVersionSet" + a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) + return nil + case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" // Implemented in 3.5.x + a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) + return nil + case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" // Implemented in 3.5.x + a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + return nil + } + + if !shouldApplyV3 { + return nil + } + // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: @@ -221,15 +240,6 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.AuthRoleList != nil: op = "AuthRoleList" ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) - case r.ClusterVersionSet != nil: // Implemented in 3.5.x - op = "ClusterVersionSet" - a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) - case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" // Implemented in 3.5.x - a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) - case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" // Implemented in 3.5.x - a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) } @@ -903,26 +913,27 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } -func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) { - a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) +func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) { + a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) } -func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) { +func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) { a.s.cluster.UpdateAttributes( types.ID(r.Member_ID), membership.Attributes{ Name: r.MemberAttributes.Name, ClientURLs: r.MemberAttributes.ClientUrls, }, + shouldApplyV3, ) } -func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) { d := membership.DowngradeInfo{Enabled: false} if r.Enabled { d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} } - a.s.cluster.SetDowngradeInfo(&d) + a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3) } type quotaApplierV3 struct { diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index c1de09f11b57..140ec847dd52 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -41,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a return &authApplierV3{applierV3: base, as: as, lessor: lessor} } -func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { +func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { aa.mu.Lock() defer aa.mu.Unlock() if r.Header != nil { @@ -57,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return &applyResult{err: err} } } - ret := aa.applierV3.Apply(r) + ret := aa.applierV3.Apply(r, shouldApplyV3) aa.authInfo.Username = "" aa.authInfo.Revision = 0 return ret diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index e322c29a6805..f2e3d89da57a 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -87,7 +87,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr) + a.cluster.UpdateAttributes(id, attr, true) } // return an empty response since there is no consumer. return Response{} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 71016e9f7cf8..033ac1c4951a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -560,8 +560,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) kvindex := srv.consistIndex.ConsistentIndex() - srv.lg.Debug("restore consistentIndex", - zap.Uint64("index", kvindex)) + srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. @@ -2027,8 +2026,13 @@ func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, ) (appliedt uint64, appliedi uint64, shouldStop bool) { + s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) for i := range es { e := es[i] + s.lg.Debug("Applying entry", + zap.Uint64("index", e.Index), + zap.Uint64("term", e.Term), + zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: s.applyEntryNormal(&e) @@ -2037,12 +2041,14 @@ func (s *EtcdServer) apply( case raftpb.EntryConfChange: // set the consistent index of current executing entry + shouldApplyV3 := false if e.Index > s.consistIndex.ConsistentIndex() { s.consistIndex.SetConsistentIndex(e.Index) + shouldApplyV3 = true } var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - removedSelf, err := s.applyConfChange(cc, confState) + removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf @@ -2092,18 +2098,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { var r pb.Request rp := &r pbutil.MustUnmarshal(rp, e.Data) + s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) return } + s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req)) return } - // do not re-apply applied entries. - if !shouldApplyV3 { - return - } id := raftReq.ID if id == 0 { @@ -2116,7 +2120,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - ar = s.applyV3.Apply(&raftReq) + ar = s.applyV3.Apply(&raftReq, shouldApplyV3) + } + + // do not re-apply applied entries. + if !shouldApplyV3 { + return } if ar == nil { @@ -2158,7 +2167,7 @@ func (s *EtcdServer) notifyAboutFirstCommitInTerm() { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) { if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) @@ -2181,9 +2190,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con ) } if confChangeContext.IsPromote { - s.cluster.PromoteMember(confChangeContext.Member.ID) + s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3) } else { - s.cluster.AddMember(&confChangeContext.Member) + s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3) if confChangeContext.Member.ID != s.id { s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) @@ -2201,7 +2210,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.cluster.RemoveMember(id) + s.cluster.RemoveMember(id, shouldApplyV3) if id == s.id { return true, nil } @@ -2219,7 +2228,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con zap.String("member-id-from-message", m.ID.String()), ) } - s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3) if m.ID != s.id { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 38d6ef2cfeed..1f1cd1bd907d 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -181,7 +181,7 @@ func TestApplyRepeat(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -509,9 +509,9 @@ func TestApplyConfChangeError(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) @@ -576,7 +576,7 @@ func TestApplyConfChangeError(t *testing.T) { r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), cluster: cl, } - _, err := srv.applyConfChange(tt.cc, nil) + _, err := srv.applyConfChange(tt.cc, nil, true) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -597,7 +597,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 3; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -616,7 +616,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { NodeID: 2, } // remove non-local member - shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -626,7 +626,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // remove local member cc.NodeID = 1 - shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -640,7 +640,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: types.ID(1)}) + cl.AddMember(&membership.Member{ID: types.ID(1)}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), @@ -688,7 +688,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 5; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -1342,7 +1342,7 @@ func TestRemoveMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1386,7 +1386,7 @@ func TestUpdateMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(st) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1874,7 +1874,7 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { func newTestCluster(membs []*membership.Member) *membership.RaftCluster { c := membership.NewCluster(zap.NewExample(), "") for _, m := range membs { - c.AddMember(m) + c.AddMember(m, true) } return c } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 41303ee3a2a7..2256902fee19 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -43,7 +43,6 @@ var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") - ErrCanceled = errors.New("mvcc: watcher is canceled") ) const ( @@ -438,6 +437,10 @@ func (s *store) restore() error { tx.Unlock() + s.lg.Info("kvstore restored", + zap.Uint64("consistent-index", s.ConsistentIndex()), + zap.Int64("current-rev", s.currentRev)) + if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { s.lg.Warn("compaction encountered error", zap.Error(err)) diff --git a/tests/go.mod b/tests/go.mod index 1e28eb32810a..3c00a6171a68 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -26,6 +26,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.5.1 go.etcd.io/bbolt v1.3.5 go.etcd.io/etcd/api/v3 v3.5.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0-alpha.0 diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index 184c09bc1508..f7d984be0b15 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -20,13 +20,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/ordering" "go.etcd.io/etcd/tests/v3/integration" ) func TestDetectKvOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -43,7 +44,7 @@ func TestDetectKvOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -69,27 +70,31 @@ func TestDetectKvOrderViolation(t *testing.T) { func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error { return errOrderViolation }) - _, err = orderingKv.Get(ctx, "foo") + v, err := orderingKv.Get(ctx, "foo") if err != nil { t.Fatal(err) } + t.Logf("Read from the first member: v:%v err:%v", v, err) + assert.Equal(t, []byte("buzz"), v.Kvs[0].Value) // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed - _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 after restart") + v, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 returned: v:%v erro:%v ", v, err) if err != errOrderViolation { - t.Fatalf("expected %v, got %v", errOrderViolation, err) + t.Fatalf("expected %v, got err:%v v:%v", errOrderViolation, err, v) } } func TestDetectTxnOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -106,7 +111,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -144,7 +149,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed