Skip to content

Commit

Permalink
Dustin decker/max concurrent jobs (#126)
Browse files Browse the repository at this point in the history
* Add option to limit maximum concurrent jobs

* move limiter into executor, out of scheduler

* missed an err check in the test

* tweak tests

* fix test comments

* simplify if block

Co-authored-by: streppel <[email protected]>

* handle scheduler stop inside waiting job

* add context to job func to allow cancellation on job removal

Co-authored-by: Dustin <[email protected]>
Co-authored-by: streppel <[email protected]>
  • Loading branch information
3 people authored Feb 25, 2021
1 parent 982d4ef commit f251977
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 21 deletions.
9 changes: 9 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ func ExampleScheduler_Seconds() {

}

func ExampleScheduler_SetMaxConcurrentJobs() {
s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(1, gocron.RescheduleMode)
_, _ = s.Every(1).Seconds().Do(func() {
fmt.Println("This will run once every 5 seconds even though it is scheduled every second because maximum concurrent job limit is set.")
time.Sleep(5 * time.Second)
})
}

func ExampleScheduler_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Second().SingletonMode().Do(task)
Expand Down
63 changes: 60 additions & 3 deletions executor.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
package gocron

import (
"context"
"sync"

"golang.org/x/sync/semaphore"
)

const (
// default is that if a limit on maximum concurrent jobs is set
// and the limit is reached, a job will skip it's run and try
// again on the next occurrence in the schedule
RescheduleMode limitMode = iota

// in wait mode if a limit on maximum concurrent jobs is set
// and the limit is reached, a job will wait to try and run
// until a spot in the limit is freed up.
//
// Note: this mode can produce unpredictable results as
// job execution order isn't guaranteed. For example, a job that
// executes frequently may pile up in the wait queue and be executed
// many times back to back when the queue opens.
WaitMode
)

type executor struct {
jobFunctions chan jobFunction
stop chan struct{}
jobFunctions chan jobFunction
stop chan struct{}
limitMode limitMode
maxRunningJobs *semaphore.Weighted
}

func newExecutor() executor {
Expand All @@ -18,25 +40,60 @@ func newExecutor() executor {

func (e *executor) start() {
wg := sync.WaitGroup{}
stopCtx, cancel := context.WithCancel(context.Background())

for {
select {
case f := <-e.jobFunctions:
wg.Add(1)
go func() {
defer wg.Done()

if e.maxRunningJobs != nil {
if !e.maxRunningJobs.TryAcquire(1) {

switch e.limitMode {
case RescheduleMode:
return
case WaitMode:
for {
select {
case <-stopCtx.Done():
return
case <-f.ctx.Done():
return
default:
}

if e.maxRunningJobs.TryAcquire(1) {
break
}
}
}
}

defer e.maxRunningJobs.Release(1)
}

switch f.runConfig.mode {
case defaultMode:
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
case singletonMode:
_, _, _ = f.limiter.Do("main", func() (interface{}, error) {
select {
case <-stopCtx.Done():
return nil, nil
case <-f.ctx.Done():
return nil, nil
default:
}
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
return nil, nil
})
}

}()
case <-e.stop:
cancel()
wg.Wait()
return
}
Expand Down
6 changes: 6 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gocron

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -34,6 +35,8 @@ type jobFunction struct {
name string // the Job name to run, func[jobFunc]
runConfig runConfig // configuration for how many times to run the job
limiter *singleflight.Group // limits inflight runs of job to one
ctx context.Context // for cancellation
cancel context.CancelFunc // for cancellation
}

type runConfig struct {
Expand All @@ -56,13 +59,16 @@ const (

// NewJob creates a new Job with the provided interval
func NewJob(interval int) *Job {
ctx, cancel := context.WithCancel(context.Background())
return &Job{
interval: interval,
lastRun: time.Time{},
nextRun: time.Time{},
jobFunction: jobFunction{
functions: make(map[string]interface{}),
params: make(map[string][]interface{}),
ctx: ctx,
cancel: cancel,
},
tags: []string{},
startsImmediately: true,
Expand Down
18 changes: 15 additions & 3 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"
)

type limitMode int8

// Scheduler struct stores a list of Jobs and the location of time Scheduler
// Scheduler implements the sort.Interface{} for sorting Jobs, by the time of nextRun
type Scheduler struct {
Expand All @@ -17,9 +21,8 @@ type Scheduler struct {

locationMutex sync.RWMutex
location *time.Location

runningMutex sync.RWMutex
running bool // represents if the scheduler is running at the moment or not
runningMutex sync.RWMutex
running bool // represents if the scheduler is running at the moment or not

time timeWrapper // wrapper around time.Time
executor *executor // executes jobs passed via chan
Expand All @@ -38,6 +41,13 @@ func NewScheduler(loc *time.Location) *Scheduler {
}
}

// SetMaxConcurrentJobs limits how many jobs can be running at the same time.
// This is useful when running resource intensive jobs and a precise start time is not critical.
func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) {
s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n))
s.executor.limitMode = mode
}

// StartBlocking starts all jobs and blocks the current thread
func (s *Scheduler) StartBlocking() {
s.StartAsync()
Expand Down Expand Up @@ -395,6 +405,7 @@ func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) {
retainedJobs = append(retainedJobs, job)
} else {
job.stopTimer()
job.cancel()
}
}
s.setJobs(retainedJobs)
Expand All @@ -408,6 +419,7 @@ func (s *Scheduler) RemoveByTag(tag string) error {
}
// Remove job if job index is valid
s.jobs[jobindex].stopTimer()
s.jobs[jobindex].cancel()
s.setJobs(removeAtIndex(s.jobs, jobindex))
return nil
}
Expand Down
143 changes: 128 additions & 15 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,26 +1145,41 @@ func TestCalculateMonths(t *testing.T) {
}

func TestScheduler_SingletonMode(t *testing.T) {
t.Run("next run of long running job doesn't overrun", func(t *testing.T) {
//semaphore := make(chan bool)

s := NewScheduler(time.UTC)
var trigger int32
testCases := []struct {
description string
removeJob bool
}{
{"with scheduler stop", false},
{"with job removal", true},
}

_, err := s.Every(1).Second().SingletonMode().Do(func() {
if atomic.LoadInt32(&trigger) == 1 {
t.Fatal("Restart should not occur")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
var trigger int32

j, err := s.Every(1).Second().SingletonMode().Do(func() {
if atomic.LoadInt32(&trigger) == 1 {
t.Fatal("Restart should not occur")
}
atomic.AddInt32(&trigger, 1)
time.Sleep(3 * time.Second)
})
require.NoError(t, err)

s.StartAsync()
time.Sleep(2 * time.Second)

if tc.removeJob {
s.RemoveByReference(j)
time.Sleep(3 * time.Second)
}
atomic.AddInt32(&trigger, 1)
fmt.Println("I am a long task")
time.Sleep(3 * time.Second)
s.Stop()
})
require.NoError(t, err)
}

s.StartAsync()
time.Sleep(2 * time.Second)
s.Stop()
})
}

func TestScheduler_LimitRunsTo(t *testing.T) {
Expand Down Expand Up @@ -1239,6 +1254,104 @@ func TestScheduler_LimitRunsTo(t *testing.T) {
})
}

func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
semaphore := make(chan bool)

testCases := []struct {
description string
maxConcurrentJobs int
mode limitMode
expectedRuns int
removeJobs bool
f func()
}{
// Expecting a total of 4 job runs:
// 0s - jobs 1 & 3 run, job 2 hits the limit and is skipped
// 1s - job 1 hits the limit and is skipped
// 2s - job 1 & 2 run
// 3s - job 1 hits the limit and is skipped
{"reschedule mode", 2, RescheduleMode, 4, false,
func() {
semaphore <- true
time.Sleep(2 * time.Second)
},
},

// Expecting a total of 8 job runs. The exact order of jobs may vary, for example:
// 0s - jobs 2 & 3 run, job 1 hits the limit and waits
// 1s - job 1 runs twice, the blocked run and the regularly scheduled run
// 2s - jobs 1 & 3 run
// 3s - jobs 2 & 3 run, job 1 hits the limit and waits
{"wait mode", 2, WaitMode, 8, false,
func() {
semaphore <- true
time.Sleep(1 * time.Second)
},
},

// Same as above - this confirms the same behavior when jobs are removed rather than the scheduler being stopped
{"wait mode - with job removal", 2, WaitMode, 8, true,
func() {
semaphore <- true
time.Sleep(1 * time.Second)
},
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {

s := NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(tc.maxConcurrentJobs, tc.mode)

j1, err := s.Every(1).Second().Do(tc.f)
require.NoError(t, err)

j2, err := s.Every(2).Second().Do(tc.f)
require.NoError(t, err)

j3, err := s.Every(3).Second().Do(tc.f)
require.NoError(t, err)

s.StartAsync()

var counter int

now := time.Now()
for time.Now().Before(now.Add(4 * time.Second)) {
select {
case <-semaphore:
counter++
default:
}
}

if tc.removeJobs {
s.RemoveByReference(j1)
s.RemoveByReference(j2)
s.RemoveByReference(j3)
defer s.Stop()
} else {
s.Stop()
}

// make sure no more jobs are run as the executor
// or job should be properly stopped

now = time.Now()
for time.Now().Before(now.Add(1 * time.Second)) {
select {
case <-semaphore:
counter++
default:
}
}

assert.Equal(t, tc.expectedRuns, counter)
})
}
}

func TestScheduler_RemoveAfterLastRun(t *testing.T) {
t.Run("job removed after the last run", func(t *testing.T) {
semaphore := make(chan bool)
Expand Down

0 comments on commit f251977

Please sign in to comment.