From aa34d9ca8bf0f02b60e961d69ee3cfe4e3a86c53 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 2 Apr 2021 18:01:26 +0200 Subject: [PATCH] Membership: Add additional methods to trim/manage membership data in backend. --- server/etcdserver/api/membership/cluster.go | 2 ++ server/etcdserver/api/membership/member.go | 34 ++++++++++++--------- server/etcdserver/api/membership/store.go | 8 +++++ server/mvcc/backend/batch_tx.go | 13 ++++++++ server/mvcc/kvstore_test.go | 1 + 5 files changed, 44 insertions(+), 14 deletions(-) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 047493bc981b..fef6b0f27048 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -249,9 +249,11 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { defer c.Unlock() if c.be != nil { + c.version = clusterVersionFromBackend(c.lg, c.be) c.members, c.removed = membersFromBackend(c.lg, c.be) c.lg.Info("members recovered from backend:", zap.Any("members", c.members), zap.Any("removed", c.removed)) } else { + c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) c.lg.Info("members recovered from v2store:", zap.Any("members", c.members), zap.Any("removed", c.removed)) } diff --git a/server/etcdserver/api/membership/member.go b/server/etcdserver/api/membership/member.go index 98a00ba95770..2e344c4f2698 100644 --- a/server/etcdserver/api/membership/member.go +++ b/server/etcdserver/api/membership/member.go @@ -49,27 +49,22 @@ type Member struct { // NewMember creates a Member without an ID and generates one based on the // cluster name, peer URLs, and time. This is used for bootstrapping/adding new member. func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { - return newMember(name, peerURLs, clusterName, now, false) + memberId := computeMemberId(peerURLs, clusterName, now) + return newMember(name, peerURLs, memberId, false) } // NewMemberAsLearner creates a learner Member without an ID and generates one based on the // cluster name, peer URLs, and time. This is used for adding new learner member. func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { - return newMember(name, peerURLs, clusterName, now, true) + memberId := computeMemberId(peerURLs, clusterName, now) + return newMember(name, peerURLs, memberId, true) } -func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time, isLearner bool) *Member { - m := &Member{ - RaftAttributes: RaftAttributes{ - PeerURLs: peerURLs.StringSlice(), - IsLearner: isLearner, - }, - Attributes: Attributes{Name: name}, - } - +func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID { var b []byte - sort.Strings(m.PeerURLs) - for _, p := range m.PeerURLs { + peerURLstrs := peerURLs.StringSlice() + sort.Strings(peerURLstrs) + for _, p := range peerURLstrs { b = append(b, []byte(p)...) } @@ -79,7 +74,18 @@ func newMember(name string, peerURLs types.URLs, clusterName string, now *time.T } hash := sha1.Sum(b) - m.ID = types.ID(binary.BigEndian.Uint64(hash[:8])) + return types.ID(binary.BigEndian.Uint64(hash[:8])) +} + +func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member { + m := &Member{ + RaftAttributes: RaftAttributes{ + PeerURLs: peerURLs.StringSlice(), + IsLearner: isLearner, + }, + Attributes: Attributes{Name: name}, + ID: memberId, + } return m } diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 004f6dd1a92b..ff7ad29a8c9d 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -57,6 +57,14 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx.UnsafePut(membersBucketName, mkey, mvalue) } +func TrimClusterFromBackend(be backend.Backend) error { + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeDeleteBucket(clusterBucketName) + return nil +} + func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { mkey := backendMemberKey(id) diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index eb75c29fc92c..aeff57210af5 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -28,6 +28,7 @@ import ( type BatchTx interface { ReadTx UnsafeCreateBucket(name []byte) + UnsafeDeleteBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeDelete(bucketName []byte, key []byte) @@ -80,6 +81,18 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) { t.pending++ } +func (t *batchTx) UnsafeDeleteBucket(name []byte) { + err := t.tx.DeleteBucket(name) + if err != nil && err != bolt.ErrBucketExists { + t.backend.lg.Fatal( + "failed to delete a bucket", + zap.String("bucket-name", string(name)), + zap.Error(err), + ) + } + t.pending++ +} + // UnsafePut must be called holding the lock on the tx. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { t.unsafePut(bucketName, key, value, false) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 854fe8139f76..e25f4aa05696 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -877,6 +877,7 @@ func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} func (b *fakeBatchTx) RUnlock() {} func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} +func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {} func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) }