From dcd4edae17e26e854763121aedc84fec4b1cba75 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 26 Mar 2024 16:29:25 -0500 Subject: [PATCH] fix case where OneTimeJob with concurrent limit and limited runs fails to run (#703) --- scheduler.go | 35 +++++++++++----------- scheduler_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/scheduler.go b/scheduler.go index 1c1735c7..dd1323c7 100644 --- a/scheduler.go +++ b/scheduler.go @@ -300,23 +300,6 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { } j.lastScheduledRun = j.nextScheduled - // if the job has a limited number of runs set, we need to - // check how many runs have occurred and stop running this - // job if it has reached the limit. - if j.limitRunsTo != nil { - j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 - if j.limitRunsTo.runCount == j.limitRunsTo.limit { - go func() { - select { - case <-s.shutdownCtx.Done(): - return - case s.removeJobCh <- id: - } - }() - return - } - } - next := j.next(j.lastScheduledRun) if next.IsZero() { // the job's next function will return zero for OneTime jobs. @@ -356,6 +339,24 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { if !ok { return } + + // if the job has a limited number of runs set, we need to + // check how many runs have occurred and stop running this + // job if it has reached the limit. + if j.limitRunsTo != nil { + j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 + if j.limitRunsTo.runCount == j.limitRunsTo.limit { + go func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.removeJobCh <- id: + } + }() + return + } + } + j.lastRun = s.now() s.jobs[id] = j } diff --git a/scheduler_test.go b/scheduler_test.go index 1f922a91..ec69faec 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1809,6 +1809,81 @@ func TestScheduler_OneTimeJob(t *testing.T) { } } +func TestScheduler_WithLimitedRuns(t *testing.T) { + goleak.VerifyNone(t) + + tests := []struct { + name string + schedulerOpts []SchedulerOption + job JobDefinition + jobOpts []JobOption + runLimit uint + expectedRuns int + }{ + { + "simple", + nil, + DurationJob(time.Millisecond * 100), + nil, + 1, + 1, + }, + { + "OneTimeJob, WithLimitConcurrentJobs", + []SchedulerOption{ + WithLimitConcurrentJobs(1, LimitModeWait), + }, + OneTimeJob(OneTimeJobStartImmediately()), + nil, + 1, + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, tt.schedulerOpts...) + + jobRan := make(chan struct{}, 10) + + jobOpts := []JobOption{ + WithLimitedRuns(tt.runLimit), + } + jobOpts = append(jobOpts, tt.jobOpts...) + + _, err := s.NewJob( + tt.job, + NewTask(func() { + jobRan <- struct{}{} + }), + jobOpts..., + ) + require.NoError(t, err) + + s.Start() + time.Sleep(time.Millisecond * 150) + + assert.NoError(t, s.Shutdown()) + + var runCount int + for runCount < tt.expectedRuns { + select { + case <-jobRan: + runCount++ + case <-time.After(time.Second): + t.Fatal("timed out waiting for job to run") + } + } + select { + case <-jobRan: + t.Fatal("job ran more than expected") + default: + } + assert.Equal(t, tt.expectedRuns, runCount) + }) + } +} + func TestScheduler_Jobs(t *testing.T) { tests := []struct { name string