From 3108ce65903aad75d2f5a7eada69e2d43104fadb Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 4 Apr 2024 21:56:18 +0000 Subject: [PATCH] add new TestSnapshotByMockingPartition using blackholing. Signed-off-by: Siyuan Zhang --- tests/e2e/etcd_snapshot_test.go | 152 ++++++++++++++++++++++++++------ tests/framework/e2e/cluster.go | 10 +++ tests/framework/e2e/etcdctl.go | 12 +++ 3 files changed, 149 insertions(+), 25 deletions(-) diff --git a/tests/e2e/etcd_snapshot_test.go b/tests/e2e/etcd_snapshot_test.go index 6659100c3ca..79255a194fb 100644 --- a/tests/e2e/etcd_snapshot_test.go +++ b/tests/e2e/etcd_snapshot_test.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !cluster_proxy + package e2e import ( @@ -58,18 +60,32 @@ func clusterTestCases(size int) []clusterTestCase { } if fileutil.Exist(e2e.BinPath.EtcdLastRelease) { - tcs = append(tcs, clusterTestCase{ - name: "LastVersion", - config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)), - }) - if size > 2 { - tcs = append(tcs, clusterTestCase{ - name: "MinorityLastVersion", - config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)), + tcs = append(tcs, + clusterTestCase{ + name: "LastVersion", + config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)), }, clusterTestCase{ - name: "QuorumLastVersion", - config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)), - }) + name: "LastVersionPeerTLS", + config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)), + }, + ) + if size > 2 { + tcs = append(tcs, + clusterTestCase{ + name: "MinorityLastVersion", + config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)), + }, clusterTestCase{ + name: "QuorumLastVersion", + config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)), + }, + clusterTestCase{ + name: "MinorityLastVersionPeerTLS", + config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)), + }, clusterTestCase{ + name: "QuorumLastVersionPeerTLS", + config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)), + }, + ) } } return tcs @@ -112,12 +128,7 @@ func snapshotTestByAddingMember(t *testing.T, clusterConfig *e2e.EtcdProcessClus }() t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)") - for i := 0; i < 20; i++ { - key := fmt.Sprintf("key-%d", i) - value := fmt.Sprintf("value-%d", i) - err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{}) - require.NoError(t, err, "failed to put %q, error: %v", key, err) - } + writeKVs(t, epc.Etcdctl(), 0, 20) t.Log("Start a new etcd instance, which will receive a snapshot from the leader.") newCfg := *epc.Cfg @@ -161,12 +172,7 @@ func snapshotTestByRestartingMember(t *testing.T, clusterConfig *e2e.EtcdProcess require.NoError(t, err) t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)") - for i := 0; i < 20; i++ { - key := fmt.Sprintf("key-%d", i) - value := fmt.Sprintf("value-%d", i) - err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{}) - require.NoError(t, err, "failed to put %q, error: %v", key, err) - } + writeKVs(t, epc.Etcdctl(), 0, 20) t.Log("Verify logs to check leader has saved snapshot") leaderEPC := epc.Procs[epc.WaitLeader(t)] @@ -179,13 +185,90 @@ func snapshotTestByRestartingMember(t *testing.T, clusterConfig *e2e.EtcdProcess assertKVHash(t, clusterConfig.ClusterSize, epc) // assert process logs to check snapshot be sent - if clusterConfig.Version == e2e.CurrentVersion || clusterConfig.Version == e2e.MinorityLastVersion { + leaderEPC = epc.Procs[epc.WaitLeader(t)] + if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd { + t.Log("Verify logs to check snapshot be sent from leader to follower") + e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot") + } +} + +func TestSnapshotByMockingPartition(t *testing.T) { + mockPartitionNodeIndex := 2 + for _, tc := range clusterTestCases(3) { + if !tc.config.IsPeerTLS { + continue + } + t.Run(tc.name, func(t *testing.T) { + snapshotTestByMockingPartition(t, tc.config, mockPartitionNodeIndex) + }) + } +} + +func snapshotTestByMockingPartition(t *testing.T, clusterConfig *e2e.EtcdProcessClusterConfig, mockPartitionNodeIndex int) { + e2e.BeforeTest(t) + + t.Logf("Create an etcd cluster with %d member\n", clusterConfig.ClusterSize) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, + e2e.WithConfig(clusterConfig), + e2e.WithSnapshotCount(10), + e2e.WithSnapshotCatchUpEntries(10), + e2e.WithPeerProxy(true), + ) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + require.NoError(t, epc.Close(), "failed to close etcd cluster") + }() + + leaderId := epc.WaitLeader(t) + partitionedMember := epc.Procs[mockPartitionNodeIndex] + if leaderId != mockPartitionNodeIndex { + partitionedMemberId, err := epc.MemberId(mockPartitionNodeIndex) + require.NoError(t, err) + // If the partitioned member is not the original leader, Blackhole would not block all its communication with other members. + t.Logf("Move leader to Proc[%d]: %d\n", mockPartitionNodeIndex, partitionedMemberId) + require.NoError(t, epc.Etcdctl().MoveLeader(context.TODO(), partitionedMemberId)) + epc.WaitLeader(t) + } + // Mock partition + proxy := partitionedMember.PeerProxy() + t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name) + proxy.BlackholeTx() + proxy.BlackholeRx() + time.Sleep(2 * time.Second) + + t.Logf("Wait for new leader election with remaining members") + leaderEPC := epc.Procs[waitLeader(t, epc, mockPartitionNodeIndex)] + t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)") + writeKVs(t, leaderEPC.Etcdctl(), 0, 20) + e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") + assertRevision(t, leaderEPC, 21) + assertRevision(t, partitionedMember, 1) + + // Wait for some time to restore the network + time.Sleep(1 * time.Second) + t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name) + proxy.UnblackholeTx() + proxy.UnblackholeRx() + + assertKVHash(t, clusterConfig.ClusterSize, epc) + + // assert process logs to check snapshot be sent + leaderEPC = epc.Procs[epc.WaitLeader(t)] + if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd { t.Log("Verify logs to check snapshot be sent from leader to follower") - leaderEPC = epc.Procs[epc.WaitLeader(t)] e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot") } } +func writeKVs(t *testing.T, etcdctl *e2e.EtcdctlV3, startIdx, endIdx int) { + for i := startIdx; i < endIdx; i++ { + key := fmt.Sprintf("key-%d", i) + value := fmt.Sprintf("value-%d", i) + err := etcdctl.Put(context.TODO(), key, value, config.PutOptions{}) + require.NoError(t, err, "failed to put %q, error: %v", key, err) + } +} + func assertKVHash(t *testing.T, clusterSize int, epc *e2e.EtcdProcessCluster) { if clusterSize < 2 { return @@ -212,3 +295,22 @@ func assertKVHash(t *testing.T, clusterSize int, epc *e2e.EtcdProcessCluster) { return true }, 10*time.Second, 500*time.Millisecond) } + +func waitLeader(t testing.TB, epc *e2e.EtcdProcessCluster, excludeNode int) int { + var membs []e2e.EtcdProcess + for i := 0; i < len(epc.Procs); i++ { + if i == excludeNode { + continue + } + membs = append(membs, epc.Procs[i]) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return epc.WaitMembersForLeader(ctx, t, membs) +} + +func assertRevision(t testing.TB, member e2e.EtcdProcess, expectedRevision int64) { + responses, err := member.Etcdctl().Status(context.TODO()) + require.NoError(t, err) + assert.Equal(t, expectedRevision, responses[0].Header.Revision, "revision mismatch") +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 96574849f6f..ed7b403454c 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -1057,3 +1057,13 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi t.Fatal("impossible path of execution") return -1 } + +// MemberId returns the MemberId of the ith Proc in the cluster. +func (epc *EtcdProcessCluster) MemberId(i int) (uint64, error) { + etcdctl := epc.Etcdctl() + memberList, err := etcdctl.MemberList(context.Background(), false) + if err != nil { + return 0, fmt.Errorf("failed to get member list: %w", err) + } + return findMemberIDByEndpoint(memberList.Members, epc.Procs[i].Config().ClientURL) +} diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index 0d6bb3a5dcd..d31950b7f61 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -663,6 +663,18 @@ func (ctl *EtcdctlV3) RoleDelete(ctx context.Context, role string) (*clientv3.Au return &resp, err } +func (ctl *EtcdctlV3) MoveLeader(ctx context.Context, transfereeID uint64) error { + args := []string{"move-leader", fmt.Sprintf("%x", transfereeID)} + cmd, err := SpawnCmd(ctl.cmdArgs(args...), nil) + if err != nil { + return err + } + defer cmd.Close() + + _, err = cmd.ExpectWithContext(ctx, expect.ExpectedResponse{Value: "Leadership transferred"}) + return err +} + func (ctl *EtcdctlV3) spawnJSONCmd(ctx context.Context, output any, args ...string) error { args = append(args, "-w", "json") cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), nil)