Skip to content

Commit

Permalink
fix tests hanging
Browse files Browse the repository at this point in the history
Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Nov 25, 2024
1 parent fce786f commit 8fbc719
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 29 deletions.
69 changes: 42 additions & 27 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func Test_Run(t *testing.T) {
cron := cronI.(*cron)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh1 := make(chan error)
errCh2 := make(chan error)

go func() {
errCh1 <- cronI.Run(ctx)
close(errCh1)
}()

select {
Expand All @@ -64,6 +66,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh2 <- cronI.Run(ctx)
close(errCh2)
}()

select {
Expand Down Expand Up @@ -109,6 +112,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh <- cronI.Run(ctx)
close(errCh)
}()

// wait until ready
Expand Down Expand Up @@ -159,7 +163,7 @@ func Test_Run(t *testing.T) {
cronI, err := New(Options{
Log: logr.Discard(),
Client: client,
Namespace: "abc",
Namespace: "abcd",
PartitionID: 0,
PartitionTotal: 10,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
Expand All @@ -176,6 +180,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh <- cronI.Run(ctx)
close(errCh)
}()

cron.lock.RLock()
Expand Down Expand Up @@ -219,12 +224,12 @@ func Test_Run(t *testing.T) {
}
}(childCtx)

_, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData))
_, err = client.Put(context.Background(), "abcd/leadership/1", string(leadershipData))
require.NoError(t, err, "failed to insert leadership data into etcd")

select {
case <-restartingCh:
case <-time.After(1 * time.Second):
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

Expand Down Expand Up @@ -264,6 +269,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh <- cronI.Run(ctx)
close(errCh)
}()

cron.lock.RLock()
Expand Down Expand Up @@ -312,18 +318,18 @@ func Test_Run(t *testing.T) {

select {
case <-restartingCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for cron to be restarting")
}

cron.lock.Lock()
cron.lock.RLock()
newReadyCh := cron.readyCh
cron.lock.Unlock()
cron.lock.RUnlock()

select {
case <-newReadyCh:
t.Fatal("cron should not be ready while restarting")
case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
}

cancel()
Expand All @@ -345,7 +351,7 @@ func Test_Run(t *testing.T) {
cronI, err := New(Options{
Log: logr.Discard(),
Client: client,
Namespace: "abc",
Namespace: "abcde",
PartitionID: 0,
PartitionTotal: 10,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
Expand All @@ -362,6 +368,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh <- cronI.Run(ctx)
close(errCh)
}()

cron.lock.RLock()
Expand Down Expand Up @@ -406,19 +413,19 @@ func Test_Run(t *testing.T) {
}
}(childCtx)

_, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData))
_, err = client.Put(context.Background(), "abcde/leadership/1", string(leadershipData))
require.NoError(t, err, "failed to insert leadership data into etcd")

// cron engine restarting
select {
case <-restartingCh:
case <-time.After(1 * time.Second):
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

cron.lock.Lock()
cron.lock.RLock()
newReadyCh := cron.readyCh
cron.lock.Unlock()
cron.lock.RUnlock()

select {
case <-newReadyCh:
Expand Down Expand Up @@ -463,7 +470,7 @@ func Test_Run(t *testing.T) {
cronI, err := New(Options{
Log: logr.Discard(),
Client: client,
Namespace: "abc",
Namespace: "abcdef",
PartitionID: 0,
PartitionTotal: 10,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
Expand All @@ -480,6 +487,7 @@ func Test_Run(t *testing.T) {

go func() {
errCh <- cronI.Run(ctx)
close(errCh)
}()

cron.lock.RLock()
Expand All @@ -493,13 +501,6 @@ func Test_Run(t *testing.T) {
t.Fatal("timed out waiting for cron to be ready")
}

// intentionally incorrect total
leadershipData, err := proto.Marshal(&stored.Leadership{
Total: 5,
ReplicaData: replicaData,
})
require.NoError(t, err, "failed to marshal leadership data")

childCtx, childCancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -524,18 +525,32 @@ func Test_Run(t *testing.T) {
}
}(childCtx)

_, err = client.Put(context.Background(), "abc/leadership/1", string(leadershipData))
// wait until ready
select {
case <-readyCh:
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

// intentionally incorrect total
leadershipData, err := proto.Marshal(&stored.Leadership{
Total: 5,
ReplicaData: replicaData,
})
require.NoError(t, err, "failed to marshal leadership data")

_, err = client.Put(context.Background(), "abcdef/leadership/1", string(leadershipData))
require.NoError(t, err, "failed to insert leadership data into etcd")

select {
case <-restartingCh:
case <-time.After(1 * time.Second):
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for cron to be ready")
}

cron.lock.Lock()
cron.lock.RLock()
newReadyCh := cron.readyCh
cron.lock.Unlock()
cron.lock.RUnlock()

select {
case <-newReadyCh:
Expand All @@ -557,7 +572,7 @@ func Test_Run(t *testing.T) {
})
require.NoError(t, err)

_, err = client.Put(context.Background(), "abc/leadership/1", string(correctLeadershipData))
_, err = client.Put(context.Background(), "abcdef/leadership/1", string(correctLeadershipData))
require.NoError(t, err, "failed to update leadership data in etcd")

// Wait for the cron to become ready again
Expand All @@ -567,7 +582,7 @@ func Test_Run(t *testing.T) {

select {
case <-finalReadyCh:
case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
t.Fatal("cron did not become ready again after restart")
}

Expand Down
8 changes: 6 additions & 2 deletions internal/leadership/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ func (l *Leadership) checkLeadershipKeys(ctx context.Context) (bool, error) {
l.lock.Lock()
defer l.lock.Unlock()

resp, err := l.client.Get(ctx, l.key.LeadershipKey())
getKeyCtx, getKeyCancel := context.WithTimeout(ctx, 5*time.Second)
defer getKeyCancel()
resp, err := l.client.Get(getKeyCtx, l.key.LeadershipKey())
if err != nil {
return false, err
}
Expand All @@ -253,7 +255,9 @@ func (l *Leadership) checkLeadershipKeys(ctx context.Context) (bool, error) {
return false, errors.New("lost partition leadership key")
}

resp, err = l.client.Get(ctx, l.key.LeadershipNamespace(), clientv3.WithPrefix())
getNSCtx, getNSCancel := context.WithTimeout(ctx, 5*time.Second)
defer getNSCancel()
resp, err = l.client.Get(getNSCtx, l.key.LeadershipNamespace(), clientv3.WithPrefix())
if err != nil {
return false, err
}
Expand Down

0 comments on commit 8fbc719

Please sign in to comment.