-
Notifications
You must be signed in to change notification settings - Fork 9.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Siyuan Zhang <[email protected]>
- Loading branch information
1 parent
38f3eb3
commit 6088caf
Showing
8 changed files
with
521 additions
and
159 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
// Copyright 2024 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package e2e | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.etcd.io/etcd/client/pkg/v3/fileutil" | ||
"go.etcd.io/etcd/tests/v3/framework/e2e" | ||
) | ||
|
||
// TestMixVersionsSnapshotByAddingMember tests the mix version send snapshots by adding member | ||
func TestMixVersionsSnapshotByAddingMember(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
clusterVersion e2e.ClusterVersion | ||
newInstanceVersion e2e.ClusterVersion | ||
}{ | ||
// TODO: uncomment after v3.4.32 release. | ||
// After v3.4.32, etcd can support adding a new member of 3.4 into | ||
// a cluster with 3.5 if the 3.4 member is started with '--next-cluster-version-compatible'. | ||
/*{ | ||
name: "etcd instance with last version receives snapshot from the leader with current version", | ||
clusterVersion: e2e.CurrentVersion, | ||
newInstaceVersion: e2e.LastVersion, | ||
},*/ | ||
{ | ||
name: "etcd instance with current version receives snapshot from the leader with last version", | ||
clusterVersion: e2e.LastVersion, | ||
newInstanceVersion: e2e.CurrentVersion, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
mixVersionsSnapshotTestByAddingMember(t, tc.clusterVersion, tc.newInstanceVersion) | ||
}) | ||
} | ||
} | ||
|
||
func mixVersionsSnapshotTestByAddingMember(t *testing.T, clusterVersion, newInstanceVersion e2e.ClusterVersion) { | ||
if !fileutil.Exist(e2e.BinPathLastRelease) { | ||
t.Skipf("%q does not exist", e2e.BinPathLastRelease) | ||
} | ||
|
||
e2e.BeforeTest(t) | ||
|
||
// Create an etcd cluster with 1 member | ||
copiedCfg := e2e.NewConfigNoTLS() | ||
copiedCfg.ClusterSize = 1 | ||
copiedCfg.Version = clusterVersion | ||
copiedCfg.SnapshotCount = 10 | ||
copiedCfg.BasePeerScheme = "unix" // to avoid port conflict | ||
|
||
epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg) | ||
|
||
require.NoError(t, err, "failed to start etcd cluster: %v", err) | ||
defer func() { | ||
derr := epc.Close() | ||
require.NoError(t, derr, "failed to close etcd cluster: %v", derr) | ||
}() | ||
|
||
// Write more than SnapshotCount entries to trigger at least a snapshot. | ||
t.Log("Writing 20 keys to the cluster") | ||
|
||
etcdctl := e2e.NewEtcdctl(epc.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) | ||
for i := 0; i < 20; i++ { | ||
key := fmt.Sprintf("key-%d", i) | ||
value := fmt.Sprintf("value-%d", i) | ||
err = etcdctl.Put(key, value) | ||
require.NoError(t, err, "failed to put %q, error: %v", key, err) | ||
} | ||
|
||
// start a new etcd instance, which will receive a snapshot from the leader. | ||
newCfg := *epc.Cfg | ||
newCfg.Version = newInstanceVersion | ||
t.Log("Starting a new etcd instance") | ||
_, err = epc.StartNewProc(&newCfg, t) | ||
require.NoError(t, err, "failed to start the new etcd instance: %v", err) | ||
defer epc.Close() | ||
|
||
// verify all nodes have exact same revision and hash | ||
t.Log("Verify all nodes have exact same revision and hash") | ||
assert.Eventually(t, func() bool { | ||
hashKvs, err := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false).HashKV(0) | ||
if err != nil { | ||
t.Logf("failed to get HashKV: %v", err) | ||
return false | ||
} | ||
if len(hashKvs) != 2 { | ||
t.Logf("expected 2 hashkv responses, but got: %d", len(hashKvs)) | ||
return false | ||
} | ||
|
||
if hashKvs[0].Header.Revision != hashKvs[1].Header.Revision { | ||
t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision) | ||
return false | ||
} | ||
|
||
assert.Equal(t, hashKvs[0].Hash, hashKvs[1].Hash) | ||
|
||
return true | ||
}, 10*time.Second, 500*time.Millisecond) | ||
} | ||
|
||
func TestMixVersionsSnapshotByMockingPartition(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
clusterVersion e2e.ClusterVersion | ||
mockPartitionNodeIndex int | ||
}{ | ||
{ | ||
name: "etcd instance with last version receives entries from the leader with current version", | ||
clusterVersion: e2e.MinorityLastVersion, | ||
mockPartitionNodeIndex: 2, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
mixVersionsSnapshotTestByMockPartition(t, tc.clusterVersion, tc.mockPartitionNodeIndex) | ||
}) | ||
} | ||
} | ||
|
||
func mixVersionsSnapshotTestByMockPartition(t *testing.T, clusterVersion e2e.ClusterVersion, mockPartitionNodeIndex int) { | ||
e2e.BeforeTest(t) | ||
|
||
if !fileutil.Exist(e2e.BinPathLastRelease) { | ||
t.Skipf("%q does not exist", e2e.BinPathLastRelease) | ||
} | ||
|
||
// Create an etcd cluster with 3 member | ||
copiedCfg := e2e.NewConfigNoTLS() | ||
copiedCfg.ClusterSize = 3 | ||
copiedCfg.Version = clusterVersion | ||
copiedCfg.SnapshotCount = 10 | ||
copiedCfg.BasePeerScheme = "unix" // to avoid port conflict | ||
|
||
epc, err := e2e.NewEtcdProcessCluster(t, copiedCfg) | ||
require.NoError(t, err, "failed to start etcd cluster: %v", err) | ||
defer func() { | ||
derr := epc.Close() | ||
require.NoError(t, derr, "failed to close etcd cluster: %v", derr) | ||
}() | ||
toPartitionedMember := epc.Procs[mockPartitionNodeIndex] | ||
|
||
// Stop and restart the partitioned member | ||
err = toPartitionedMember.Stop() | ||
require.NoError(t, err) | ||
|
||
// Write more than SnapshotCount entries to trigger at least a snapshot. | ||
t.Log("Writing 20 keys to the cluster") | ||
etcdctl := e2e.NewEtcdctl(epc.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) | ||
for i := 0; i < 20; i++ { | ||
key := fmt.Sprintf("key-%d", i) | ||
value := fmt.Sprintf("value-%d", i) | ||
err = etcdctl.Put(key, value) | ||
require.NoError(t, err, "failed to put %q, error: %v", key, err) | ||
} | ||
|
||
t.Log("Verify logs to check leader has saved snapshot") | ||
leaderEPC := epc.Procs[epc.WaitLeader(t)] | ||
e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") | ||
|
||
// Restart the partitioned member | ||
err = toPartitionedMember.Restart() | ||
require.NoError(t, err) | ||
|
||
// verify all nodes have exact same revision and hash | ||
t.Log("Verify all nodes have exact same revision and hash") | ||
assert.Eventually(t, func() bool { | ||
hashKvs, err := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false).HashKV(0) | ||
if err != nil { | ||
t.Logf("failed to get HashKV: %v", err) | ||
return false | ||
} | ||
if len(hashKvs) != 3 { | ||
t.Logf("expected 3 hashkv responses, but got: %d", len(hashKvs)) | ||
return false | ||
} | ||
|
||
if hashKvs[0].Header.Revision != hashKvs[1].Header.Revision { | ||
t.Logf("Got different revisions, [%d, %d]", hashKvs[0].Header.Revision, hashKvs[1].Header.Revision) | ||
return false | ||
} | ||
if hashKvs[1].Header.Revision != hashKvs[2].Header.Revision { | ||
t.Logf("Got different revisions, [%d, %d]", hashKvs[1].Header.Revision, hashKvs[2].Header.Revision) | ||
return false | ||
} | ||
|
||
assert.Equal(t, hashKvs[0].Hash, hashKvs[1].Hash) | ||
assert.Equal(t, hashKvs[1].Hash, hashKvs[2].Hash) | ||
|
||
return true | ||
}, 10*time.Second, 500*time.Millisecond) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.