Skip to content

Commit

Permalink
add scale leadership test for rep data + partition total updates
Browse files Browse the repository at this point in the history
Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Dec 6, 2024
1 parent 1ed105d commit 116250e
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions internal/leadership/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/genproto/googleapis/type/expr"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/diagridio/go-etcd-cron/internal/api/stored"
"github.com/diagridio/go-etcd-cron/internal/client"
Expand Down Expand Up @@ -841,6 +842,110 @@ func Test_Run(t *testing.T) {
require.NoError(t, err, "Leadership should be acquired now since excess keys were removed")
}
})

t.Run("Simulate dynamic scaling while updating replication data and partition total", func(t *testing.T) {
client := etcd.Embedded(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// single instance
partitionTotal := uint32(1)
initialReplicaData := wrapperspb.Bytes([]byte("one-data"))
replicaData, err := anypb.New(initialReplicaData)
require.NoError(t, err)

leader := New(Options{
Client: client,
PartitionTotal: partitionTotal,
Key: key.New(key.Options{
Namespace: "abc",
PartitionID: 0,
}),
ReplicaData: replicaData,
})
singleCtx, singleCancel := context.WithCancel(ctx)
go func() { _ = leader.Run(singleCtx) }()

_, err = leader.WaitForLeadership(ctx)
require.NoError(t, err)

resp, err := client.Get(ctx, "abc/leadership/0")
require.NoError(t, err)
require.Contains(t, string(resp.Kvs[0].Value), "one-data")

singleCancel()

// 3 instances & new replication data
partitionTotal = 3
var leaders []*Leadership
threeCtx, threeCancel := context.WithCancel(ctx)
newReplicaData := wrapperspb.Bytes([]byte("three-data"))
replicaData, err = anypb.New(newReplicaData)
require.NoError(t, err)

for i := uint32(0); i < partitionTotal; i++ {
leader := New(Options{
Client: client,
PartitionTotal: partitionTotal,
Key: key.New(key.Options{
Namespace: "abc",
PartitionID: i,
}),
ReplicaData: replicaData,
})
leaders = append(leaders, leader)

go func(l *Leadership) { _ = l.Run(threeCtx) }(leader)
}

for _, leader := range leaders {
_, err := leader.WaitForLeadership(ctx)
require.NoError(t, err)
}

for i := uint32(0); i < partitionTotal; i++ {
resp, err := client.Get(ctx, fmt.Sprintf("abc/leadership/%d", i))
require.NoError(t, err)
require.Contains(t, string(resp.Kvs[0].Value), "three-data", "Replication data mismatch for leader %d", i)
}

threeCancel()
leaders = nil

// 5 instances & new replication data
partitionTotal = 5
fiveCtx, fiveCancel := context.WithCancel(ctx)
defer fiveCancel()

newReplicaData = wrapperspb.Bytes([]byte("five-data"))
replicaData, err = anypb.New(newReplicaData)
require.NoError(t, err)

for i := uint32(0); i < partitionTotal; i++ {
leader := New(Options{
Client: client,
PartitionTotal: partitionTotal,
Key: key.New(key.Options{
Namespace: "abc",
PartitionID: i,
}),
ReplicaData: replicaData,
})
leaders = append(leaders, leader)
go func(l *Leadership) { _ = l.Run(fiveCtx) }(leader)
}

for _, leader := range leaders {
_, err := leader.WaitForLeadership(ctx)
require.NoError(t, err)
}

for i := uint32(0); i < partitionTotal; i++ {
resp, err := client.Get(ctx, fmt.Sprintf("abc/leadership/%d", i))
require.NoError(t, err)
require.Contains(t, string(resp.Kvs[0].Value), "five-data", "Replication data mismatch for leader %d", i)
}
})
}

func Test_checkLeadershipKeys(t *testing.T) {
Expand Down

0 comments on commit 116250e

Please sign in to comment.