Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: refactor mix_version_test. #17776

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 91 additions & 103 deletions tests/e2e/etcd_mix_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,66 +28,81 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

type clusterTestCase struct {
name string
config *e2e.EtcdProcessClusterConfig
}

func clusterTestCases(size int) []clusterTestCase {
tcs := []clusterTestCase{
{
name: "CurrentVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size)),
},
}
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
return tcs
}

tcs = append(tcs,
clusterTestCase{
name: "LastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)),
},
)
if size > 2 {
tcs = append(tcs,
clusterTestCase{
name: "MinorityLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)),
}, clusterTestCase{
name: "QuorumLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)),
},
)
}
return tcs
}

// TestMixVersionsSnapshotByAddingMember tests the mix version send snapshots by adding member
func TestMixVersionsSnapshotByAddingMember(t *testing.T) {
cases := []struct {
name string
clusterVersion e2e.ClusterVersion
newInstanceVersion e2e.ClusterVersion
}{
for _, tc := range clusterTestCases(1) {
t.Run(tc.name+"-adding-new-member-of-current-version", func(t *testing.T) {
mixVersionsSnapshotTestByAddingMember(t, tc.config, e2e.CurrentVersion)
})
// etcd doesn't support adding a new member of old version into
// a cluster with higher version. For example, etcd cluster
// version is 3.6.x, then a new member of 3.5.x can't join the
// cluster. Please refer to link below,
// https://github.com/etcd-io/etcd/blob/3e903d0b12e399519a4013c52d4635ec8bdd6863/server/etcdserver/cluster_util.go#L222-L230
/*{
name: "etcd instance with last version receives snapshot from the leader with current version",
clusterVersion: e2e.CurrentVersion,
newInstaceVersion: e2e.LastVersion,
},*/
{
name: "etcd instance with current version receives snapshot from the leader with last version",
clusterVersion: e2e.LastVersion,
newInstanceVersion: e2e.CurrentVersion,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mixVersionsSnapshotTestByAddingMember(t, tc.clusterVersion, tc.newInstanceVersion)
})
/*t.Run(tc.name+"-adding-new-member-of-last-version", func(t *testing.T) {
mixVersionsSnapshotTestByAddingMember(t, tc.config, e2e.LastVersion)
})*/
}
}

func mixVersionsSnapshotTestByAddingMember(t *testing.T, clusterVersion, newInstanceVersion e2e.ClusterVersion) {
func mixVersionsSnapshotTestByAddingMember(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, newInstanceVersion e2e.ClusterVersion) {
e2e.BeforeTest(t)

if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}

// Create an etcd cluster with 1 member
t.Logf("Create an etcd cluster with %d member", cfg.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(1),
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithVersion(clusterVersion),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
}()

// Write more than SnapshotCount entries to trigger at least a snapshot.
t.Log("Writing 20 keys to the cluster")
for i := 0; i < 20; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
writeKVs(t, epc.Etcdctl(), 0, 20)

// start a new etcd instance, which will receive a snapshot from the leader.
t.Log("start a new etcd instance, which will receive a snapshot from the leader.")
newCfg := *epc.Cfg
newCfg.Version = newInstanceVersion
newCfg.ServerConfig.SnapshotCatchUpEntries = 10
Expand All @@ -96,62 +111,29 @@ func mixVersionsSnapshotTestByAddingMember(t *testing.T, clusterVersion, newInst
require.NoError(t, err, "failed to start the new etcd instance: %v", err)
defer epc.CloseProc(context.TODO(), nil)

// verify all nodes have exact same revision and hash
t.Log("Verify all nodes have exact same revision and hash")
assert.Eventually(t, func() bool {
hashKvs, err := epc.Etcdctl().HashKV(context.TODO(), 0)
if err != nil {
t.Logf("failed to get HashKV: %v", err)
return false
}
if len(hashKvs) != 2 {
t.Logf("expected 2 hashkv responses, but got: %d", len(hashKvs))
return false
}

if hashKvs[0].Header.Revision != hashKvs[1].Header.Revision {
t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision)
return false
}

assert.Equal(t, hashKvs[0].Hash, hashKvs[1].Hash)

return true
}, 10*time.Second, 500*time.Millisecond)
assertKVHash(t, epc)
}

func TestMixVersionsSnapshotByMockingPartition(t *testing.T) {
cases := []struct {
name string
clusterVersion e2e.ClusterVersion
mockPartitionNodeIndex int
}{
{
name: "etcd instance with last version receives snapshot from the leader with current version",
clusterVersion: e2e.MinorityLastVersion,
mockPartitionNodeIndex: 2,
},
}

for _, tc := range cases {
mockPartitionNodeIndex := 2
for _, tc := range clusterTestCases(3) {
t.Run(tc.name, func(t *testing.T) {
mixVersionsSnapshotTestByMockPartition(t, tc.clusterVersion, tc.mockPartitionNodeIndex)
mixVersionsSnapshotTestByMockPartition(t, tc.config, mockPartitionNodeIndex)
})
}
}

func mixVersionsSnapshotTestByMockPartition(t *testing.T, clusterVersion e2e.ClusterVersion, mockPartitionNodeIndex int) {
func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, mockPartitionNodeIndex int) {
e2e.BeforeTest(t)

if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}

// Create an etcd cluster with 3 member of MinorityLastVersion
t.Logf("Create an etcd cluster with %d member", cfg.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(3),
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithVersion(clusterVersion),
e2e.WithSnapshotCatchUpEntries(10),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
Expand All @@ -161,57 +143,63 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, clusterVersion e2e.Clu
}()
toPartitionedMember := epc.Procs[mockPartitionNodeIndex]

// Stop and restart the partitioned member
t.Log("Stop and restart the partitioned member")
err = toPartitionedMember.Stop()
require.NoError(t, err)

// Write more than SnapshotCount entries to trigger at least a snapshot.
t.Log("Writing 20 keys to the cluster")
for i := 0; i < 20; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
writeKVs(t, epc.Etcdctl(), 0, 20)

t.Log("Verify logs to check leader has saved snapshot")
leaderEPC := epc.Procs[epc.WaitLeader(t)]
e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot")

// Restart the partitioned member
t.Log("Restart the partitioned member")
err = toPartitionedMember.Restart(context.TODO())
require.NoError(t, err)

// verify all nodes have exact same revision and hash
assertKVHash(t, epc)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd {
t.Log("Verify logs to check snapshot be sent from leader to follower")
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
}

func writeKVs(t *testing.T, etcdctl *e2e.EtcdctlV3, startIdx, endIdx int) {
for i := startIdx; i < endIdx; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err := etcdctl.Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
}

func assertKVHash(t *testing.T, epc *e2e.EtcdProcessCluster) {
clusterSize := len(epc.Procs)
if clusterSize < 2 {
return
}
t.Log("Verify all nodes have exact same revision and hash")
assert.Eventually(t, func() bool {
hashKvs, err := epc.Etcdctl().HashKV(context.TODO(), 0)
if err != nil {
t.Logf("failed to get HashKV: %v", err)
return false
}
if len(hashKvs) != 3 {
t.Logf("expected 3 hashkv responses, but got: %d", len(hashKvs))
if len(hashKvs) != clusterSize {
t.Logf("expected %d hashkv responses, but got: %d", clusterSize, len(hashKvs))
return false
}
for i := 1; i < clusterSize; i++ {
if hashKvs[0].Header.Revision != hashKvs[i].Header.Revision {
t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision)
return false
}

if hashKvs[0].Header.Revision != hashKvs[1].Header.Revision {
t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision)
return false
}
if hashKvs[1].Header.Revision != hashKvs[2].Header.Revision {
t.Logf("Got different revisions, [%d, %d]", hashKvs[1].Header.Revision, hashKvs[2].Header.Revision)
return false
assert.Equal(t, hashKvs[0].Hash, hashKvs[i].Hash)
}

assert.Equal(t, hashKvs[0].Hash, hashKvs[1].Hash)
assert.Equal(t, hashKvs[1].Hash, hashKvs[2].Hash)

return true
}, 10*time.Second, 500*time.Millisecond)

// assert process logs to check snapshot be sent
t.Log("Verify logs to check snapshot be sent from leader to follower")
leaderEPC = epc.Procs[epc.WaitLeader(t)]
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
Loading