Skip to content

Commit

Permalink
feat: add lock update mechanism for jobs, closes #762
Browse files Browse the repository at this point in the history
- Add `jobOutUpdateLockRequest` channel in the executor
- Implement lock update requests in the job execution process
- Add `Lock()` method to the `Job` interface
- Update the scheduler to handle lock update requests
- Add a test case to verify the new locking mechanism
  • Loading branch information
pcfreak30 committed Dec 3, 2024
1 parent c7c0a17 commit 0b8479c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 7 deletions.
20 changes: 18 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type executor struct {
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// used to request jobs from the scheduler
jobOutUpdateLockRequest chan jobOutUpdateLockRequest

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
Expand Down Expand Up @@ -376,7 +378,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
Expand All @@ -385,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

Expand Down
8 changes: 8 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type internalJob struct {
nextScheduled []time.Time

lastRun time.Time
lastLock Lock
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -1018,6 +1019,7 @@ type Job interface {
RunNow() error
// Tags returns the job's string tags.
Tags() []string
Lock() Lock
}

var _ Job = (*job)(nil)
Expand Down Expand Up @@ -1116,3 +1118,9 @@ func (j job) RunNow() error {
}
return err
}

func (j job) Lock() Lock {
ij := requestJob(j.id, j.jobOutRequest)

return ij.lastLock
}
26 changes: 21 additions & 5 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ type jobOutRequest struct {
outChan chan internalJob
}

type jobOutUpdateLockRequest struct {
id uuid.UUID
lock Lock
}

type runJobRequest struct {
id uuid.UUID
outChan chan error
Expand All @@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
logger: &noOpLogger{},
clock: clockwork.NewRealClock(),

jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
done: make(chan error),
}

s := &scheduler{
Expand Down Expand Up @@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case out := <-s.jobOutRequestCh:
s.selectJobOutRequest(out)

case out := <-s.exec.jobOutUpdateLockRequest:
s.jobOutUpdateLockRequest(out)

case out := <-s.allJobsOutRequest:
s.selectAllJobsOutRequest(out)

Expand Down Expand Up @@ -432,6 +441,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan)
}

func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
if j, ok := s.jobs[out.id]; ok {
j.lastLock = out.lock
s.jobs[out.id] = j
}
}

func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started {
Expand Down
41 changes: 41 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,3 +2633,44 @@ func TestScheduler_WithMonitor(t *testing.T) {
})
}
}

func TestJob_Lock(t *testing.T) {
locker := &testLocker{
notLocked: make(chan struct{}, 1),
}

s := newTestScheduler(t,
WithDistributedLocker(locker),
)

jobRan := make(chan struct{})
j, err := s.NewJob(
DurationJob(time.Millisecond*100),
NewTask(func() {
time.Sleep(50 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)

s.Start()
defer s.Shutdown()

Check failure on line 2657 in scheduler_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.22)

Error return value of `s.Shutdown` is not checked (errcheck)

select {
case <-jobRan:
// Job has run
case <-time.After(200 * time.Millisecond):
t.Fatal("Job did not run in time")
}

require.Eventually(t, func() bool {
if locker.jobLocked {

Check failure on line 2667 in scheduler_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.22)

S1008: should use 'return locker.jobLocked' instead of 'if locker.jobLocked { return true }; return false' (gosimple)
return true
}

return false
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")

lock := j.Lock()
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
}

0 comments on commit 0b8479c

Please sign in to comment.