From 0b8479c456fe25d9ea7d61e777d2c330a2f786e1 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 22 Jul 2024 05:36:17 -0400 Subject: [PATCH] feat: add lock update mechanism for jobs, closes #762 - Add `jobOutUpdateLockRequest` channel in the executor - Implement lock update requests in the job execution process - Add `Lock()` method to the `Job` interface - Update the scheduler to handle lock update requests - Add a test case to verify the new locking mechanism --- executor.go | 20 ++++++++++++++++++-- job.go | 8 ++++++++ scheduler.go | 26 +++++++++++++++++++++----- scheduler_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/executor.go b/executor.go index af4b7c98..5cc38112 100644 --- a/executor.go +++ b/executor.go @@ -30,6 +30,8 @@ type executor struct { jobsOutCompleted chan uuid.UUID // used to request jobs from the scheduler jobOutRequest chan jobOutRequest + // used to request jobs from the scheduler + jobOutUpdateLockRequest chan jobOutUpdateLockRequest // used by the executor to receive a stop signal from the scheduler stopCh chan struct{} @@ -376,7 +378,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.incrementJobCounter(j, Skip) return } - defer func() { _ = lock.Unlock(j.ctx) }() + e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{ + id: j.id, + lock: lock, + } + + defer func() { + _ = lock.Unlock(j.ctx) + }() } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { @@ -385,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.incrementJobCounter(j, Skip) return } - defer func() { _ = lock.Unlock(j.ctx) }() + e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{ + id: j.id, + lock: lock, + } + + defer func() { + _ = lock.Unlock(j.ctx) + }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) diff --git a/job.go b/job.go index 890889ed..0a34b331 100644 --- a/job.go +++ b/job.go @@ -30,6 +30,7 @@ type internalJob struct { nextScheduled []time.Time lastRun time.Time + lastLock Lock function any parameters []any timer clockwork.Timer @@ -1018,6 +1019,7 @@ type Job interface { RunNow() error // Tags returns the job's string tags. Tags() []string + Lock() Lock } var _ Job = (*job)(nil) @@ -1116,3 +1118,9 @@ func (j job) RunNow() error { } return err } + +func (j job) Lock() Lock { + ij := requestJob(j.id, j.jobOutRequest) + + return ij.lastLock +} diff --git a/scheduler.go b/scheduler.go index 90ff5212..7a5dbaf9 100644 --- a/scheduler.go +++ b/scheduler.go @@ -107,6 +107,11 @@ type jobOutRequest struct { outChan chan internalJob } +type jobOutUpdateLockRequest struct { + id uuid.UUID + lock Lock +} + type runJobRequest struct { id uuid.UUID outChan chan error @@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { logger: &noOpLogger{}, clock: clockwork.NewRealClock(), - jobsIn: make(chan jobIn), - jobsOutForRescheduling: make(chan uuid.UUID), - jobsOutCompleted: make(chan uuid.UUID), - jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + jobsIn: make(chan jobIn), + jobsOutForRescheduling: make(chan uuid.UUID), + jobsOutCompleted: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest), + done: make(chan error), } s := &scheduler{ @@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { case out := <-s.jobOutRequestCh: s.selectJobOutRequest(out) + case out := <-s.exec.jobOutUpdateLockRequest: + s.jobOutUpdateLockRequest(out) + case out := <-s.allJobsOutRequest: s.selectAllJobsOutRequest(out) @@ -432,6 +441,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) { close(out.outChan) } +func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) { + if j, ok := s.jobs[out.id]; ok { + j.lastLock = out.lock + s.jobs[out.id] = j + } +} + func (s *scheduler) selectNewJob(in newJobIn) { j := in.job if s.started { diff --git a/scheduler_test.go b/scheduler_test.go index d335c4a1..1463d13a 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2633,3 +2633,44 @@ func TestScheduler_WithMonitor(t *testing.T) { }) } } + +func TestJob_Lock(t *testing.T) { + locker := &testLocker{ + notLocked: make(chan struct{}, 1), + } + + s := newTestScheduler(t, + WithDistributedLocker(locker), + ) + + jobRan := make(chan struct{}) + j, err := s.NewJob( + DurationJob(time.Millisecond*100), + NewTask(func() { + time.Sleep(50 * time.Millisecond) + jobRan <- struct{}{} + }), + ) + require.NoError(t, err) + + s.Start() + defer s.Shutdown() + + select { + case <-jobRan: + // Job has run + case <-time.After(200 * time.Millisecond): + t.Fatal("Job did not run in time") + } + + require.Eventually(t, func() bool { + if locker.jobLocked { + return true + } + + return false + }, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked") + + lock := j.Lock() + assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker") +}