Skip to content

Commit

Permalink
Farwydi master single flight mode (#123)
Browse files Browse the repository at this point in the history
* feat: Add Singleton mode in job

This mode prevents double start.

* update readme

Co-authored-by: Leonid Zharikov <[email protected]>
  • Loading branch information
JohnRoesler and farwydi authored Feb 21, 2021
1 parent 52c7231 commit 982d4ef
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 22 deletions.
28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,32 @@ If you want to chat, you can find us at Slack! [<img src="https://img.shields.io

## Examples

Take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples)
```golang
s := gocron.NewScheduler(time.UTC)

s.Every(5).Seconds().Do(func(){ ... })
```

For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples)

## FAQ

* Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
* Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
* A: We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)

* Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered?
* A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`.

---
Looking to contribute? Try to follow these guidelines:
* Use issues for everything
* For a small change, just send a PR!
* For bigger changes, please open an issue for discussion before sending a PR.
* PRs should have: tests, documentation and examples (if it makes sense)
* You can also contribute by:
* Reporting issues
* Suggesting new features or enhancements
* Improving/fixing documentation
* Use issues for everything
* For a small change, just send a PR!
* For bigger changes, please open an issue for discussion before sending a PR.
* PRs should have: tests, documentation and examples (if it makes sense)
* You can also contribute by:
* Reporting issues
* Suggesting new features or enhancements
* Improving/fixing documentation
---

[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with GoLand licenses. We appreciate their support for free and open source software!
16 changes: 16 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func ExampleJob_ScheduledTime() {
fmt.Println(job.ScheduledTime())
}

func ExampleJob_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every(1).Second().Do(task)
job.SingletonMode()
}

func ExampleJob_Tag() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every("1s").Do(task)
Expand Down Expand Up @@ -427,6 +433,8 @@ func ExampleScheduler_Second() {
_, _ = s.Every(1).Do(task)
_, _ = s.Every(1).Second().Do(task)
_, _ = s.Every(1).Seconds().Do(task)
_, _ = s.Every("1s").Seconds().Do(task)
_, _ = s.Every(time.Second).Seconds().Do(task)
}

func ExampleScheduler_Seconds() {
Expand All @@ -437,6 +445,14 @@ func ExampleScheduler_Seconds() {
_, _ = s.Every(1).Do(task)
_, _ = s.Every(1).Second().Do(task)
_, _ = s.Every(1).Seconds().Do(task)
_, _ = s.Every("1s").Seconds().Do(task)
_, _ = s.Every(time.Second).Seconds().Do(task)

}

func ExampleScheduler_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Second().SingletonMode().Do(task)
}

func ExampleScheduler_StartBlocking() {
Expand Down
12 changes: 11 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ func (e *executor) start() {
wg.Add(1)
go func() {
defer wg.Done()
callJobFuncWithParams(f.functions[f.name], f.params[f.name])

switch f.runConfig.mode {
case defaultMode:
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
case singletonMode:
_, _, _ = f.limiter.Do("main", func() (interface{}, error) {
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
return nil, nil
})
}

}()
case <-e.stop:
wg.Wait()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ go 1.15
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
39 changes: 33 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sync"
"time"

"golang.org/x/sync/singleflight"
)

// Job struct stores the information necessary to run a Job
Expand All @@ -22,7 +24,6 @@ type Job struct {
scheduledWeekday *time.Weekday // Specific day of the week to start on
dayOfTheMonth int // Specific day of the month to run the job
tags []string // allow the user to tag Jobs with certain labels
runConfig runConfig // configuration for how many times to run the job
runCount int // number of times the job ran
timer *time.Timer
}
Expand All @@ -31,14 +32,28 @@ type jobFunction struct {
functions map[string]interface{} // Map for the function task store
params map[string][]interface{} // Map for function and params of function
name string // the Job name to run, func[jobFunc]
runConfig runConfig // configuration for how many times to run the job
limiter *singleflight.Group // limits inflight runs of job to one
}

type runConfig struct {
finiteRuns bool
maxRuns int
mode mode
removeAfterLastRun bool
}

// mode is the Job's running mode
type mode int8

const (
// defaultMode disable any mode
defaultMode mode = iota

// singletonMode switch to single job mode
singletonMode
)

// NewJob creates a new Job with the provided interval
func NewJob(interval int) *Job {
return &Job{
Expand Down Expand Up @@ -137,18 +152,30 @@ func (j *Job) Weekday() (time.Weekday, error) {

// LimitRunsTo limits the number of executions of this job to n.
// The job will remain in the scheduler.
// Note: If a job is added to a running scheduler and this method is used
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run more than the set limit as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// LimitRunsTo() func on the scheduler chain when scheduling the job.
// For example: scheduler.LimitRunsTo(1).Do()
func (j *Job) LimitRunsTo(n int) {
j.Lock()
defer j.Unlock()
j.runConfig = runConfig{
finiteRuns: true,
maxRuns: n,
}
j.runConfig.finiteRuns = true
j.runConfig.maxRuns = n
}

// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run overrun itself as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// SingletonMode() func on the scheduler chain when scheduling the job.
func (j *Job) SingletonMode() {
j.Lock()
defer j.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.limiter = &singleflight.Group{}

}

// RemoveAfterLastRun sets the job to be removed after it's last run (when limited)
Expand Down
6 changes: 4 additions & 2 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ func TestJob_shouldRunAgain(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &Job{
runConfig: tt.runConfig,
runCount: tt.runCount,
jobFunction: jobFunction{
runConfig: tt.runConfig,
},
runCount: tt.runCount,
}
if got := j.shouldRun(); got != tt.want {
t.Errorf("Job.shouldRunAgain() = %v, want %v", got, tt.want)
Expand Down
10 changes: 10 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func (s *Scheduler) run(job *Job) {
return
}

job.Lock()
defer job.Unlock()
job.setLastRun(s.now())
job.runCount++
s.executor.jobFunctions <- job.jobFunction
Expand Down Expand Up @@ -436,6 +438,14 @@ func (s *Scheduler) LimitRunsTo(i int) *Scheduler {
return s
}

// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
func (s *Scheduler) SingletonMode() *Scheduler {
job := s.getCurrentJob()
job.SingletonMode()
return s
}

// RemoveAfterLastRun sets the job to be removed after it's last run (when limited)
func (s *Scheduler) RemoveAfterLastRun() *Scheduler {
job := s.getCurrentJob()
Expand Down
27 changes: 25 additions & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var _ timeWrapper = (*fakeTime)(nil)
Expand Down Expand Up @@ -1144,6 +1144,29 @@ func TestCalculateMonths(t *testing.T) {
assert.Equal(t, s.time.Now(s.location).AddDate(0, 1, 0).Month(), job.nextRun.Month())
}

func TestScheduler_SingletonMode(t *testing.T) {
t.Run("next run of long running job doesn't overrun", func(t *testing.T) {
//semaphore := make(chan bool)

s := NewScheduler(time.UTC)
var trigger int32

_, err := s.Every(1).Second().SingletonMode().Do(func() {
if atomic.LoadInt32(&trigger) == 1 {
t.Fatal("Restart should not occur")
}
atomic.AddInt32(&trigger, 1)
fmt.Println("I am a long task")
time.Sleep(3 * time.Second)
})
require.NoError(t, err)

s.StartAsync()
time.Sleep(2 * time.Second)
s.Stop()
})
}

func TestScheduler_LimitRunsTo(t *testing.T) {
t.Run("job added before starting scheduler", func(t *testing.T) {
semaphore := make(chan bool)
Expand Down

0 comments on commit 982d4ef

Please sign in to comment.