Skip to content

Commit

Permalink
Applying consistency fix: ClusterVersionSet (and co) might get no app…
Browse files Browse the repository at this point in the history
…lied on v2store

ClusterVersionSet, ClusterMemberAttrSet, DowngradeInfoSet functions are
writing both to V2store and backend. Prior this CL there were
in a branch not executed if shouldApplyV3 was false,
e.g. during restore when Backend is up-to-date (has high
consistency-index) while v2store requires replay from WAL log.

The most serious consequence of this bug was that v2store after restore
could have different index (revision) than the same exact store before restore,
so potentially different content between replicas.

Also this change is supressing double-applying of Membership
(ClusterConfig) changes on Backend (store v3) - that lackilly are not
part of MVCC/KeyValue store, so they didn't caused Revisions to be
bumped.

Inspired by jingyih@ comment:
etcd-io#12820 (comment)
  • Loading branch information
ptabor committed Apr 13, 2021
1 parent 7f97dfd commit 7695db6
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 82 deletions.
6 changes: 3 additions & 3 deletions etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
cl.AddMember(m, true)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
cl.RemoveMember(types.ID(cc.NodeID), true)
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes, true)
}
}

Expand Down
2 changes: 1 addition & 1 deletion etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (s *v3Manager) saveWALAndSnap() error {
st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
s.cl.SetStore(st)
for _, m := range s.cl.Members() {
s.cl.AddMember(m)
s.cl.AddMember(m, true)
}

m := s.cl.MemberByName(s.name)
Expand Down
29 changes: 15 additions & 14 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// TODO: this must be switched to backend as well.
members, removed := membersFromStore(c.lg, c.v2store)
id := types.ID(cc.NodeID)
if removed[id] {
Expand Down Expand Up @@ -370,13 +371,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist.
func (c *RaftCluster) AddMember(m *Member) {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
}

Expand All @@ -393,13 +394,13 @@ func (c *RaftCluster) AddMember(m *Member) {

// RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics.
func (c *RaftCluster) RemoveMember(id types.ID) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()
if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
}

Expand All @@ -425,7 +426,7 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
}
}

func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()

Expand All @@ -434,7 +435,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
if c.v2store != nil {
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
}
return
Expand All @@ -459,15 +460,15 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
}

// PromoteMember marks the member's IsLearner RaftAttributes to false.
func (c *RaftCluster) PromoteMember(id types.ID) {
func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()

c.members[id].RaftAttributes.IsLearner = false
if c.v2store != nil {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
}

Expand All @@ -478,15 +479,15 @@ func (c *RaftCluster) PromoteMember(id types.ID) {
)
}

func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()

c.members[id].RaftAttributes = raftAttr
if c.v2store != nil {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
}

Expand All @@ -508,7 +509,7 @@ func (c *RaftCluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}

func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) {
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()
if c.version != nil {
Expand All @@ -533,7 +534,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
if c.v2store != nil {
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveClusterVersionToBackend(c.be, ver)
}
if oldVer != nil {
Expand Down Expand Up @@ -809,11 +810,11 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
return d
}

func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) {
func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) {
c.Lock()
defer c.Unlock()

if c.be != nil {
if c.be != nil && shouldApplyV3 {
mustSaveDowngradeToBackend(c.lg, c.be, d)
}

Expand Down
14 changes: 7 additions & 7 deletions server/etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr})
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, true)
}
cl.RemoveMember(4)
cl.RemoveMember(4, true)

attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestClusterGenID(t *testing.T) {
previd := cs.ID()

cs.SetStore(mockstore.NewNop())
cs.AddMember(newTestMember(3, nil, "", nil))
cs.AddMember(newTestMember(3, nil, "", nil), true)
cs.genID()
if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestClusterAddMember(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
c.SetStore(st)
c.AddMember(newTestMember(1, nil, "node1", nil))
c.AddMember(newTestMember(1, nil, "node1", nil), true)

wactions := []testutil.Action{
{
Expand All @@ -512,7 +512,7 @@ func TestClusterAddMemberAsLearner(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
c.SetStore(st)
c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil))
c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil), true)

wactions := []testutil.Action{
{
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestClusterRemoveMember(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
c.SetStore(st)
c.RemoveMember(1)
c.RemoveMember(1, true)

wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}},
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestClusterUpdateAttributes(t *testing.T) {
c := newTestCluster(t, tt.mems)
c.removed = tt.removed

c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs})
c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}, true)
if g := c.Members(); !reflect.DeepEqual(g, tt.wmems) {
t.Errorf("#%d: members = %+v, want %+v", i, g, tt.wmems)
}
Expand Down
49 changes: 30 additions & 19 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ type applyResult struct {

// applierV3Internal is the interface for processing internal V3 raft request
type applierV3Internal interface {
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool)
}

// applierV3 is the interface for processing V3 raft messages
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult
Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult

Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
)
}

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult {
op := "unknown"
ar := &applyResult{}
defer func(start time.Time) {
Expand All @@ -142,6 +142,25 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
}
}(time.Now())

switch {
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return nil
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return nil
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return nil
}

if !shouldApplyV3 {
return nil
}

// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
Expand Down Expand Up @@ -221,15 +240,6 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
case r.AuthRoleList != nil:
op = "AuthRoleList"
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
default:
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
}
Expand Down Expand Up @@ -903,26 +913,27 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
return resp, err
}

func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) {
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability)
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) {
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
}

func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) {
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) {
a.s.cluster.UpdateAttributes(
types.ID(r.Member_ID),
membership.Attributes{
Name: r.MemberAttributes.Name,
ClientURLs: r.MemberAttributes.ClientUrls,
},
shouldApplyV3,
)
}

func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) {
d := membership.DowngradeInfo{Enabled: false}
if r.Enabled {
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.s.cluster.SetDowngradeInfo(&d)
a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3)
}

type quotaApplierV3 struct {
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
}

func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult {
aa.mu.Lock()
defer aa.mu.Unlock()
if r.Header != nil {
Expand All @@ -57,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
return &applyResult{err: err}
}
}
ret := aa.applierV3.Apply(r)
ret := aa.applierV3.Apply(r, shouldApplyV3)
aa.authInfo.Username = ""
aa.authInfo.Revision = 0
return ret
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr)
a.cluster.UpdateAttributes(id, attr, true)
}
// return an empty response since there is no consumer.
return Response{}
Expand Down
Loading

0 comments on commit 7695db6

Please sign in to comment.