Skip to content

Commit

Permalink
Fix acquire jobs after session refresh ghalistener (#3307)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikola-jokic authored Feb 27, 2024
1 parent e06c7ed commit 7da2d7f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
8 changes: 4 additions & 4 deletions cmd/ghalistener/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,9 @@ func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*ac

l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids))

ids, err := l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
idsAcquired, err := l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
if err == nil { // if NO errors
return ids, nil
return idsAcquired, nil
}

expiredError := &actions.MessageQueueTokenExpiredError{}
Expand All @@ -398,12 +398,12 @@ func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*ac
return nil, err
}

ids, err = l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
idsAcquired, err = l.client.AcquireJobs(ctx, l.scaleSetID, l.session.MessageQueueAccessToken, ids)
if err != nil {
return nil, fmt.Errorf("failed to acquire jobs after session refresh: %w", err)
}

return ids, nil
return idsAcquired, nil
}

func (l *Listener) refreshSession(ctx context.Context) error {
Expand Down
22 changes: 18 additions & 4 deletions cmd/ghalistener/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,6 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()

// First call to AcquireJobs will fail with a token expired error
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()

// Second call to AcquireJobs will succeed
want := []int64{1, 2, 3}
availableJobs := []*actions.JobAvailable{
Expand All @@ -650,7 +647,24 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
},
},
}
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()

// First call to AcquireJobs will fail with a token expired error
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ids := args.Get(3).([]int64)
assert.Equal(t, want, ids)
}).
Return(nil, &actions.MessageQueueTokenExpiredError{}).
Once()

// First call to AcquireJobs will fail with a token expired error
client.On("AcquireJobs", ctx, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ids := args.Get(3).([]int64)
assert.Equal(t, want, ids)
}).
Return(want, nil).
Once()

config.Client = client

Expand Down

0 comments on commit 7da2d7f

Please sign in to comment.