Skip to content

Commit

Permalink
Membership: Add additional methods to trim/manage membership data in …
Browse files Browse the repository at this point in the history
…backend.
  • Loading branch information
ptabor committed Apr 2, 2021
1 parent fba5694 commit aa34d9c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 14 deletions.
2 changes: 2 additions & 0 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,11 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
defer c.Unlock()

if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
c.lg.Info("members recovered from backend:", zap.Any("members", c.members), zap.Any("removed", c.removed))
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
c.lg.Info("members recovered from v2store:", zap.Any("members", c.members), zap.Any("removed", c.removed))
}
Expand Down
34 changes: 20 additions & 14 deletions server/etcdserver/api/membership/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,22 @@ type Member struct {
// NewMember creates a Member without an ID and generates one based on the
// cluster name, peer URLs, and time. This is used for bootstrapping/adding new member.
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
return newMember(name, peerURLs, clusterName, now, false)
memberId := computeMemberId(peerURLs, clusterName, now)
return newMember(name, peerURLs, memberId, false)
}

// NewMemberAsLearner creates a learner Member without an ID and generates one based on the
// cluster name, peer URLs, and time. This is used for adding new learner member.
func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
return newMember(name, peerURLs, clusterName, now, true)
memberId := computeMemberId(peerURLs, clusterName, now)
return newMember(name, peerURLs, memberId, true)
}

func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time, isLearner bool) *Member {
m := &Member{
RaftAttributes: RaftAttributes{
PeerURLs: peerURLs.StringSlice(),
IsLearner: isLearner,
},
Attributes: Attributes{Name: name},
}

func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
var b []byte
sort.Strings(m.PeerURLs)
for _, p := range m.PeerURLs {
peerURLstrs := peerURLs.StringSlice()
sort.Strings(peerURLstrs)
for _, p := range peerURLstrs {
b = append(b, []byte(p)...)
}

Expand All @@ -79,7 +74,18 @@ func newMember(name string, peerURLs types.URLs, clusterName string, now *time.T
}

hash := sha1.Sum(b)
m.ID = types.ID(binary.BigEndian.Uint64(hash[:8]))
return types.ID(binary.BigEndian.Uint64(hash[:8]))
}

func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member {
m := &Member{
RaftAttributes: RaftAttributes{
PeerURLs: peerURLs.StringSlice(),
IsLearner: isLearner,
},
Attributes: Attributes{Name: name},
ID: memberId,
}
return m
}

Expand Down
8 changes: 8 additions & 0 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx.UnsafePut(membersBucketName, mkey, mvalue)
}

func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(clusterBucketName)
return nil
}

func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := backendMemberKey(id)

Expand Down
13 changes: 13 additions & 0 deletions server/mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafeDeleteBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
Expand Down Expand Up @@ -80,6 +81,18 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) {
t.pending++
}

func (t *batchTx) UnsafeDeleteBucket(name []byte) {
err := t.tx.DeleteBucket(name)
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to delete a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
}
t.pending++
}

// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
Expand Down
1 change: 1 addition & 0 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
func (b *fakeBatchTx) RUnlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
}
Expand Down

0 comments on commit aa34d9c

Please sign in to comment.