Skip to content

Commit

Permalink
Changes to generate v2 snapshot from v3 state
Browse files Browse the repository at this point in the history
Signed-off-by: Geeta Gharpure <[email protected]>
  • Loading branch information
Geeta Gharpure committed Aug 17, 2023
1 parent ea16ed8 commit 3590ce6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 37 deletions.
9 changes: 2 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.v2store.Clone()
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
Expand All @@ -2078,16 +2077,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)

s.GoAttach(func() {
lg := s.Logger()

d, err := clone.SaveNoCopy()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?
if err != nil {
lg.Panic("failed to save v2 store", zap.Error(err))
}
// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
Expand Down
31 changes: 12 additions & 19 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,10 @@ func TestSnapshot(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be

ch := make(chan struct{}, 2)
cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

ch := make(chan struct{}, 1)

go func() {
gaction, _ := p.Wait(2)
Expand All @@ -1066,24 +1069,11 @@ func TestSnapshot(t *testing.T) {
}
}()

go func() {
gaction, _ := st.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 2 {
t.Errorf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
t.Errorf("action = %s, want Clone", gaction[0])
}
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
}
}()

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
Expand All @@ -1098,7 +1088,8 @@ func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster(lg)
cl.SetStore(st)
be, _ := betesting.NewDefaultTmpBackend(t)
cl.SetBackend(schema.NewMembershipBackend(lg, be))

testdir := t.TempDir()

Expand All @@ -1118,7 +1109,6 @@ func TestSnapshotOrdering(t *testing.T) {
storage: p,
raftStorage: rs,
})
be, _ := betesting.NewDefaultTmpBackend(t)
ci := cindex.NewConsistentIndex(be)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
Expand Down Expand Up @@ -1211,6 +1201,9 @@ func TestTriggerSnap(t *testing.T) {
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.be = be

cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

srv.start()

donec := make(chan struct{})
Expand Down
6 changes: 1 addition & 5 deletions server/etcdserver/snapshot_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ import (
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
lg := s.Logger()
// get a snapshot of v2 store as []byte
clone := s.v2store.Clone()
d, err := clone.SaveNoCopy()
if err != nil {
lg.Panic("failed to save v2 store data", zap.Error(err))
}
d := GetMembershipInfoInV2Format(lg, s.cluster)

// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
Expand Down
29 changes: 23 additions & 6 deletions tests/e2e/v2store_deprecation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ import (
"bytes"
"context"
"fmt"
"reflect"
"sort"
"strings"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/tests/v3/framework/config"
Expand Down Expand Up @@ -125,15 +129,12 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
assert.NoError(t, epc.Close())

assertSnapshotsMatch(t, oldMemberDataDir, newMemberDataDir, func(data []byte) []byte {
// Patch cluster version
data = bytes.Replace(data, []byte("3.5.0"), []byte("X.X.X"), -1)
data = bytes.Replace(data, []byte("3.6.0"), []byte("X.X.X"), -1)
// Patch members ids
for i, mid := range members1 {
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("member%d", i+1)), -1)
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("%d", i+1)), -1)
}
for i, mid := range members2 {
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("member%d", i+1)), -1)
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("%d", i+1)), -1)
}
return data
})
Expand Down Expand Up @@ -250,7 +251,23 @@ func assertSnapshotsMatch(t testing.TB, firstDataDir, secondDataDir string, patc
if err != nil {
t.Fatal(err)
}
assert.Equal(t, openSnap(patch(firstSnapshot.Data)), openSnap(patch(secondSnapshot.Data)))
assertMembershipEqual(t, openSnap(patch(firstSnapshot.Data)), openSnap(patch(secondSnapshot.Data)))
}
}

func assertMembershipEqual(t testing.TB, firstStore v2store.Store, secondStore v2store.Store) {
rc1 := membership.NewCluster(zaptest.NewLogger(t))
rc1.SetStore(firstStore)
rc1.Recover(func(lg *zap.Logger, v *semver.Version) { return })

rc2 := membership.NewCluster(zaptest.NewLogger(t))
rc2.SetStore(secondStore)
rc2.Recover(func(lg *zap.Logger, v *semver.Version) { return })

//membership should match
if g := rc1.Members(); !reflect.DeepEqual(g, rc2.Members()) {
t.Logf("memberids_from_last_version = %+v, member_ids_from_current_version = %+v", rc1.MemberIDs(), rc2.MemberIDs())
t.Errorf("members_from_last_version_snapshot = %+v, members_from_current_version_snapshot %+v", rc1.Members(), rc2.Members())
}
}

Expand Down

0 comments on commit 3590ce6

Please sign in to comment.