Skip to content

Commit

Permalink
internal refactoring of JobOption constructor, clock moved to exec (#761
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JohnRoesler authored Jul 18, 2024
1 parent d364449 commit 256265f
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
- name: golangci-lint
uses: golangci/[email protected]
with:
version: v1.55.2
version: v1.59.1
- name: test
run: make test_ci
25 changes: 10 additions & 15 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ run:
timeout: 5m
issues-exit-code: 1
tests: true
skip-dirs:
- local

issues:
max-same-issues: 100
include:
- EXC0012
- EXC0014
exclude-dirs:
- local
exclude-rules:
- path: example_test.go
linters:
- revive
text: "seems to be unused"
fix: true

linters:
enable:
Expand All @@ -29,21 +35,10 @@ linters:
- whitespace

output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
format: colored-line-number
# print lines of code with issue, default is true
formats:
- format: colored-line-number
print-issued-lines: true
# print linter name in the end of issue text, default is true
print-linter-name: true
# make issues output unique by line, default is true
uniq-by-line: true
# add a prefix to the output file references; default is no prefix
path-prefix: ""
# sorts results by: filepath, line and column
sort-results: true

linters-settings:
golint:
min-confidence: 0.8

fix: true
51 changes: 37 additions & 14 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,48 @@ import (
"sync"
"time"

"github.com/jonboulle/clockwork"

"github.com/google/uuid"
)

type executor struct {
ctx context.Context
cancel context.CancelFunc
logger Logger
stopCh chan struct{}
jobsIn chan jobIn
// context used for shutting down
ctx context.Context
// cancel used by the executor to signal a stop of it's functions
cancel context.CancelFunc
// clock used for regular time or mocking time
clock clockwork.Clock
// the executor's logger
logger Logger

// receives jobs scheduled to execute
jobsIn chan jobIn
// sends out jobs for rescheduling
jobsOutForRescheduling chan uuid.UUID
jobsOutCompleted chan uuid.UUID
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
monitor Monitor
// sends out jobs once completed
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
done chan error

// runners for any singleton type jobs
// map[uuid.UUID]singletonRunner
singletonRunners *sync.Map
// config for limit mode
limitMode *limitModeConfig
// the elector when running distributed instances
elector Elector
// the locker when running distributed instances
locker Locker
// monitor for reporting metrics
monitor Monitor
}

type jobIn struct {
Expand Down
33 changes: 16 additions & 17 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,15 @@ func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
// OneTimeJobStartDateTime sets the date & time at which the job should run.
// This datetime must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) []time.Time {
return func(_ *internalJob) []time.Time {
return []time.Time{start}
}
}

// OneTimeJobStartDateTimes sets the date & times at which the job should run.
// At least one of the date/times must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) []time.Time {
return func(_ *internalJob) []time.Time {
return times
}
}
Expand All @@ -503,13 +503,13 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
// -----------------------------------------------

// JobOption defines the constructor for job options.
type JobOption func(*internalJob) error
type JobOption func(*internalJob, time.Time) error

// WithDistributedJobLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedJobLocker(locker Locker) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
if locker == nil {
return ErrWithDistributedJobLockerNil
}
Expand All @@ -521,7 +521,7 @@ func WithDistributedJobLocker(locker Locker) JobOption {
// WithEventListeners sets the event listeners that should be
// run for the job.
func WithEventListeners(eventListeners ...EventListener) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
for _, eventListener := range eventListeners {
if err := eventListener(j); err != nil {
return err
Expand All @@ -534,7 +534,7 @@ func WithEventListeners(eventListeners ...EventListener) JobOption {
// WithLimitedRuns limits the number of executions of this job to n.
// Upon reaching the limit, the job is removed from the scheduler.
func WithLimitedRuns(limit uint) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
j.limitRunsTo = &limitRunsTo{
limit: limit,
runCount: 0,
Expand All @@ -546,8 +546,7 @@ func WithLimitedRuns(limit uint) JobOption {
// WithName sets the name of the job. Name provides
// a human-readable identifier for the job.
func WithName(name string) JobOption {
// TODO use the name for metrics and future logging option
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
if name == "" {
return ErrWithNameEmpty
}
Expand All @@ -560,7 +559,7 @@ func WithName(name string) JobOption {
// This is useful for jobs that should not overlap, and that occasionally
// (but not consistently) run longer than the interval between job runs.
func WithSingletonMode(mode LimitMode) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
j.singletonMode = true
j.singletonLimitMode = mode
return nil
Expand All @@ -570,19 +569,19 @@ func WithSingletonMode(mode LimitMode) JobOption {
// WithStartAt sets the option for starting the job at
// a specific datetime.
func WithStartAt(option StartAtOption) JobOption {
return func(j *internalJob) error {
return option(j)
return func(j *internalJob, now time.Time) error {
return option(j, now)
}
}

// StartAtOption defines options for starting the job
type StartAtOption func(*internalJob) error
type StartAtOption func(*internalJob, time.Time) error

// WithStartImmediately tells the scheduler to run the job immediately
// regardless of the type or schedule of job. After this immediate run
// the job is scheduled from this time based on the job definition.
func WithStartImmediately() StartAtOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
j.startImmediately = true
return nil
}
Expand All @@ -591,8 +590,8 @@ func WithStartImmediately() StartAtOption {
// WithStartDateTime sets the first date & time at which the job should run.
// This datetime must be in the future.
func WithStartDateTime(start time.Time) StartAtOption {
return func(j *internalJob) error {
if start.IsZero() || start.Before(time.Now()) {
return func(j *internalJob, now time.Time) error {
if start.IsZero() || start.Before(now) {
return ErrWithStartDateTimePast
}
j.startTime = start
Expand All @@ -604,7 +603,7 @@ func WithStartDateTime(start time.Time) StartAtOption {
// a way to identify jobs by a set of tags and remove
// multiple jobs by tag.
func WithTags(tags ...string) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
j.tags = tags
return nil
}
Expand All @@ -614,7 +613,7 @@ func WithTags(tags ...string) JobOption {
// is used to uniquely identify the job and is used for logging
// and metrics.
func WithIdentifier(id uuid.UUID) JobOption {
return func(j *internalJob) error {
return func(j *internalJob, _ time.Time) error {
if id == uuid.Nil {
return ErrWithIdentifierNil
}
Expand Down
2 changes: 1 addition & 1 deletion job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func TestWithEventListeners(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var ij internalJob
err := WithEventListeners(tt.eventListeners...)(&ij)
err := WithEventListeners(tt.eventListeners...)(&ij, time.Now())
assert.Equal(t, tt.err, err)

if err != nil {
Expand Down
72 changes: 45 additions & 27 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,43 @@ type Scheduler interface {
// -----------------------------------------------

type scheduler struct {
shutdownCtx context.Context
shutdownCancel context.CancelFunc
exec executor
jobs map[uuid.UUID]internalJob
location *time.Location
clock clockwork.Clock
started bool
// context used for shutting down
shutdownCtx context.Context
// cancel used to signal scheduler should shut down
shutdownCancel context.CancelFunc
// the executor, which actually runs the jobs sent to it via the scheduler
exec executor
// the map of jobs registered in the scheduler
jobs map[uuid.UUID]internalJob
// the location used by the scheduler for scheduling when relevant
location *time.Location
// whether the scheduler has been started or not
started bool
// globally applied JobOption's set on all jobs added to the scheduler
// note: individually set JobOption's take precedence.
globalJobOptions []JobOption
logger Logger

startCh chan struct{}
startedCh chan struct{}
stopCh chan struct{}
stopErrCh chan error
allJobsOutRequest chan allJobsOutRequest
jobOutRequestCh chan jobOutRequest
runJobRequestCh chan runJobRequest
newJobCh chan newJobIn
removeJobCh chan uuid.UUID
// the scheduler's logger
logger Logger

// used to tell the scheduler to start
startCh chan struct{}
// used to report that the scheduler has started
startedCh chan struct{}
// used to tell the scheduler to stop
stopCh chan struct{}
// used to report that the scheduler has stopped
stopErrCh chan error
// used to send all the jobs out when a request is made by the client
allJobsOutRequest chan allJobsOutRequest
// used to send a jobs out when a request is made by the client
jobOutRequestCh chan jobOutRequest
// used to run a job on-demand when requested by the client
runJobRequestCh chan runJobRequest
// new jobs are received here
newJobCh chan newJobIn
// requests from the client to remove jobs by ID are received here
removeJobCh chan uuid.UUID
// requests from the client to remove jobs by tags are received here
removeJobsByTagsCh chan []string
}

Expand Down Expand Up @@ -111,6 +129,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
stopTimeout: time.Second * 10,
singletonRunners: nil,
logger: &noOpLogger{},
clock: clockwork.NewRealClock(),

jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
Expand All @@ -125,7 +144,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
exec: exec,
jobs: make(map[uuid.UUID]internalJob),
location: time.Local,
clock: clockwork.NewRealClock(),
logger: &noOpLogger{},

newJobCh: make(chan newJobIn),
Expand Down Expand Up @@ -338,7 +356,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
}
}
j.nextScheduled = append(j.nextScheduled, next)
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
// run if the scheduler has been shutdown.
Expand Down Expand Up @@ -422,7 +440,7 @@ func (s *scheduler) selectNewJob(in newJobIn) {
}

id := j.id
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
select {
case <-s.shutdownCtx.Done():
case s.exec.jobsIn <- jobIn{
Expand Down Expand Up @@ -474,7 +492,7 @@ func (s *scheduler) selectStart() {
}

jobID := id
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
select {
case <-s.shutdownCtx.Done():
case s.exec.jobsIn <- jobIn{
Expand Down Expand Up @@ -502,7 +520,7 @@ func (s *scheduler) selectStart() {
// -----------------------------------------------

func (s *scheduler) now() time.Time {
return s.clock.Now().In(s.location)
return s.exec.clock.Now().In(s.location)
}

func (s *scheduler) jobFromInternalJob(in internalJob) job {
Expand Down Expand Up @@ -643,19 +661,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW

// apply global job options
for _, option := range s.globalJobOptions {
if err := option(&j); err != nil {
if err := option(&j, s.now()); err != nil {
return nil, err
}
}

// apply job specific options, which take precedence
for _, option := range options {
if err := option(&j); err != nil {
if err := option(&j, s.now()); err != nil {
return nil, err
}
}

if err := definition.setup(&j, s.location, s.clock.Now()); err != nil {
if err := definition.setup(&j, s.location, s.exec.clock.Now()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -758,7 +776,7 @@ func WithClock(clock clockwork.Clock) SchedulerOption {
if clock == nil {
return ErrWithClockNil
}
s.clock = clock
s.exec.clock = clock
return nil
}
}
Expand Down
Loading

0 comments on commit 256265f

Please sign in to comment.