diff --git a/example_test.go b/example_test.go index 2307dd92..61c1bf48 100644 --- a/example_test.go +++ b/example_test.go @@ -450,6 +450,15 @@ func ExampleScheduler_Seconds() { } +func ExampleScheduler_SetMaxConcurrentJobs() { + s := gocron.NewScheduler(time.UTC) + s.SetMaxConcurrentJobs(1, gocron.RescheduleMode) + _, _ = s.Every(1).Seconds().Do(func() { + fmt.Println("This will run once every 5 seconds even though it is scheduled every second because maximum concurrent job limit is set.") + time.Sleep(5 * time.Second) + }) +} + func ExampleScheduler_SingletonMode() { s := gocron.NewScheduler(time.UTC) _, _ = s.Every(1).Second().SingletonMode().Do(task) diff --git a/executor.go b/executor.go index e0b5af83..077a90e2 100644 --- a/executor.go +++ b/executor.go @@ -1,12 +1,34 @@ package gocron import ( + "context" "sync" + + "golang.org/x/sync/semaphore" +) + +const ( + // default is that if a limit on maximum concurrent jobs is set + // and the limit is reached, a job will skip it's run and try + // again on the next occurrence in the schedule + RescheduleMode limitMode = iota + + // in wait mode if a limit on maximum concurrent jobs is set + // and the limit is reached, a job will wait to try and run + // until a spot in the limit is freed up. + // + // Note: this mode can produce unpredictable results as + // job execution order isn't guaranteed. For example, a job that + // executes frequently may pile up in the wait queue and be executed + // many times back to back when the queue opens. + WaitMode ) type executor struct { - jobFunctions chan jobFunction - stop chan struct{} + jobFunctions chan jobFunction + stop chan struct{} + limitMode limitMode + maxRunningJobs *semaphore.Weighted } func newExecutor() executor { @@ -18,6 +40,8 @@ func newExecutor() executor { func (e *executor) start() { wg := sync.WaitGroup{} + stopCtx, cancel := context.WithCancel(context.Background()) + for { select { case f := <-e.jobFunctions: @@ -25,18 +49,51 @@ func (e *executor) start() { go func() { defer wg.Done() + if e.maxRunningJobs != nil { + if !e.maxRunningJobs.TryAcquire(1) { + + switch e.limitMode { + case RescheduleMode: + return + case WaitMode: + for { + select { + case <-stopCtx.Done(): + return + case <-f.ctx.Done(): + return + default: + } + + if e.maxRunningJobs.TryAcquire(1) { + break + } + } + } + } + + defer e.maxRunningJobs.Release(1) + } + switch f.runConfig.mode { case defaultMode: callJobFuncWithParams(f.functions[f.name], f.params[f.name]) case singletonMode: _, _, _ = f.limiter.Do("main", func() (interface{}, error) { + select { + case <-stopCtx.Done(): + return nil, nil + case <-f.ctx.Done(): + return nil, nil + default: + } callJobFuncWithParams(f.functions[f.name], f.params[f.name]) return nil, nil }) } - }() case <-e.stop: + cancel() wg.Wait() return } diff --git a/job.go b/job.go index b2d18d86..1ce4603d 100644 --- a/job.go +++ b/job.go @@ -1,6 +1,7 @@ package gocron import ( + "context" "fmt" "sync" "time" @@ -34,6 +35,8 @@ type jobFunction struct { name string // the Job name to run, func[jobFunc] runConfig runConfig // configuration for how many times to run the job limiter *singleflight.Group // limits inflight runs of job to one + ctx context.Context // for cancellation + cancel context.CancelFunc // for cancellation } type runConfig struct { @@ -56,6 +59,7 @@ const ( // NewJob creates a new Job with the provided interval func NewJob(interval int) *Job { + ctx, cancel := context.WithCancel(context.Background()) return &Job{ interval: interval, lastRun: time.Time{}, @@ -63,6 +67,8 @@ func NewJob(interval int) *Job { jobFunction: jobFunction{ functions: make(map[string]interface{}), params: make(map[string][]interface{}), + ctx: ctx, + cancel: cancel, }, tags: []string{}, startsImmediately: true, diff --git a/scheduler.go b/scheduler.go index e846fdc5..62c1522c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -7,8 +7,12 @@ import ( "strings" "sync" "time" + + "golang.org/x/sync/semaphore" ) +type limitMode int8 + // Scheduler struct stores a list of Jobs and the location of time Scheduler // Scheduler implements the sort.Interface{} for sorting Jobs, by the time of nextRun type Scheduler struct { @@ -17,9 +21,8 @@ type Scheduler struct { locationMutex sync.RWMutex location *time.Location - - runningMutex sync.RWMutex - running bool // represents if the scheduler is running at the moment or not + runningMutex sync.RWMutex + running bool // represents if the scheduler is running at the moment or not time timeWrapper // wrapper around time.Time executor *executor // executes jobs passed via chan @@ -38,6 +41,13 @@ func NewScheduler(loc *time.Location) *Scheduler { } } +// SetMaxConcurrentJobs limits how many jobs can be running at the same time. +// This is useful when running resource intensive jobs and a precise start time is not critical. +func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) { + s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n)) + s.executor.limitMode = mode +} + // StartBlocking starts all jobs and blocks the current thread func (s *Scheduler) StartBlocking() { s.StartAsync() @@ -395,6 +405,7 @@ func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) { retainedJobs = append(retainedJobs, job) } else { job.stopTimer() + job.cancel() } } s.setJobs(retainedJobs) @@ -408,6 +419,7 @@ func (s *Scheduler) RemoveByTag(tag string) error { } // Remove job if job index is valid s.jobs[jobindex].stopTimer() + s.jobs[jobindex].cancel() s.setJobs(removeAtIndex(s.jobs, jobindex)) return nil } diff --git a/scheduler_test.go b/scheduler_test.go index edba3be2..e045b08f 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1145,26 +1145,41 @@ func TestCalculateMonths(t *testing.T) { } func TestScheduler_SingletonMode(t *testing.T) { - t.Run("next run of long running job doesn't overrun", func(t *testing.T) { - //semaphore := make(chan bool) - s := NewScheduler(time.UTC) - var trigger int32 + testCases := []struct { + description string + removeJob bool + }{ + {"with scheduler stop", false}, + {"with job removal", true}, + } - _, err := s.Every(1).Second().SingletonMode().Do(func() { - if atomic.LoadInt32(&trigger) == 1 { - t.Fatal("Restart should not occur") + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + s := NewScheduler(time.UTC) + var trigger int32 + + j, err := s.Every(1).Second().SingletonMode().Do(func() { + if atomic.LoadInt32(&trigger) == 1 { + t.Fatal("Restart should not occur") + } + atomic.AddInt32(&trigger, 1) + time.Sleep(3 * time.Second) + }) + require.NoError(t, err) + + s.StartAsync() + time.Sleep(2 * time.Second) + + if tc.removeJob { + s.RemoveByReference(j) + time.Sleep(3 * time.Second) } - atomic.AddInt32(&trigger, 1) - fmt.Println("I am a long task") - time.Sleep(3 * time.Second) + s.Stop() }) - require.NoError(t, err) + } - s.StartAsync() - time.Sleep(2 * time.Second) - s.Stop() - }) } func TestScheduler_LimitRunsTo(t *testing.T) { @@ -1239,6 +1254,104 @@ func TestScheduler_LimitRunsTo(t *testing.T) { }) } +func TestScheduler_SetMaxConcurrentJobs(t *testing.T) { + semaphore := make(chan bool) + + testCases := []struct { + description string + maxConcurrentJobs int + mode limitMode + expectedRuns int + removeJobs bool + f func() + }{ + // Expecting a total of 4 job runs: + // 0s - jobs 1 & 3 run, job 2 hits the limit and is skipped + // 1s - job 1 hits the limit and is skipped + // 2s - job 1 & 2 run + // 3s - job 1 hits the limit and is skipped + {"reschedule mode", 2, RescheduleMode, 4, false, + func() { + semaphore <- true + time.Sleep(2 * time.Second) + }, + }, + + // Expecting a total of 8 job runs. The exact order of jobs may vary, for example: + // 0s - jobs 2 & 3 run, job 1 hits the limit and waits + // 1s - job 1 runs twice, the blocked run and the regularly scheduled run + // 2s - jobs 1 & 3 run + // 3s - jobs 2 & 3 run, job 1 hits the limit and waits + {"wait mode", 2, WaitMode, 8, false, + func() { + semaphore <- true + time.Sleep(1 * time.Second) + }, + }, + + // Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped + {"wait mode - with job removal", 2, WaitMode, 8, true, + func() { + semaphore <- true + time.Sleep(1 * time.Second) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + s := NewScheduler(time.UTC) + s.SetMaxConcurrentJobs(tc.maxConcurrentJobs, tc.mode) + + j1, err := s.Every(1).Second().Do(tc.f) + require.NoError(t, err) + + j2, err := s.Every(2).Second().Do(tc.f) + require.NoError(t, err) + + j3, err := s.Every(3).Second().Do(tc.f) + require.NoError(t, err) + + s.StartAsync() + + var counter int + + now := time.Now() + for time.Now().Before(now.Add(4 * time.Second)) { + select { + case <-semaphore: + counter++ + default: + } + } + + if tc.removeJobs { + s.RemoveByReference(j1) + s.RemoveByReference(j2) + s.RemoveByReference(j3) + defer s.Stop() + } else { + s.Stop() + } + + // make sure no more jobs are run as the executor + // or job should be properly stopped + + now = time.Now() + for time.Now().Before(now.Add(1 * time.Second)) { + select { + case <-semaphore: + counter++ + default: + } + } + + assert.Equal(t, tc.expectedRuns, counter) + }) + } +} + func TestScheduler_RemoveAfterLastRun(t *testing.T) { t.Run("job removed after the last run", func(t *testing.T) { semaphore := make(chan bool)