Skip to content

Commit

Permalink
clarify some names, clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 4, 2024
1 parent 5a2b2f6 commit 0f50c28
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type jobChangeType int
const (
resetJobs jobChangeType = iota
createJobs
updateJobs
finishedJobs
removedDeploymentKey
updatedHashring
)
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
logger.Errorf(err, "failed to end cronjob %v:%v", job.DeploymentKey, job.Verb)
} else {
s.jobChanges.Publish(jobChange{
changeType: updateJobs,
changeType: finishedJobs,
jobs: []dal.CronJob{updatedJob},
})
}
Expand Down Expand Up @@ -217,8 +217,8 @@ func (s *Service) watchForUpdates(ctx context.Context) {
now := s.clock.Now()
next := time.Now().Add(time.Hour) // should never be reached, expect a different signal long beforehand
for _, j := range state.jobs {
if possibleNext, err := s.nextCheckForJob(j, state, false); err == nil {
next = *possibleNext
if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil {
next = possibleNext
break
}
}
Expand All @@ -238,7 +238,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {
case <-s.clock.After(next.Sub(now)):
// Try starting jobs in db
jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool {
if next, err := s.nextCheckForJob(j, state, true); err == nil {
if next, err := s.nextAttemptForJob(j, state, true); err == nil {

Check failure on line 241 in backend/controller/cronjobs/cronjobs.go

View workflow job for this annotation

GitHub Actions / Lint

shadow: declaration of "next" shadows declaration at line 218 (govet)
return !next.After(time.Now().UTC())
}
return false
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {
case createJobs:
logger.Tracef("adding %d jobs", len(event.jobs))
state.addJobs(event.jobs)
case updateJobs:
case finishedJobs:
logger.Tracef("updating %d jobs", len(event.jobs))
state.updateJobs(event.jobs)
case removedDeploymentKey:
Expand All @@ -298,34 +298,34 @@ func (s *Service) watchForUpdates(ctx context.Context) {
}

func (s *Service) sortJobs(state *State, i, j dal.CronJob) int {
iNext, err := s.nextCheckForJob(i, state, false)
iNext, err := s.nextAttemptForJob(i, state, false)
if err != nil {
return 1
}
jNext, err := s.nextCheckForJob(j, state, false)
jNext, err := s.nextAttemptForJob(j, state, false)
if err != nil {
return -1
}
return iNext.Compare(*jNext)
return iNext.Compare(jNext)
}

func (s *Service) nextCheckForJob(job dal.CronJob, state *State, allowsNow bool) (*time.Time, error) {
func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) {
if !s.isResponsibleForJob(job, state) {
return nil, fmt.Errorf("controller is not responsible for job")
return time.Now(), fmt.Errorf("controller is not responsible for job")
}
if job.State == dal.JobStateExecuting {
if state.isExecutingInCurrentController(job) {
// return a time in the future, meaning don't schedule at this time
return nil, fmt.Errorf("controller is already waiting for job to finish")
return time.Now(), fmt.Errorf("controller is already waiting for job to finish")
}
// We don't know when the other controller will finish this job
// We should check again when the next execution date is assuming the job finishes
next, err := gronx.NextTickAfter(job.Schedule, time.Now().UTC(), allowsNow)
if err == nil {
return &next, nil
return next, nil
}
}
return &job.NextExecution, nil
return job.NextExecution, nil
}

// Synchronise the hash ring with the active controllers.
Expand Down

0 comments on commit 0f50c28

Please sign in to comment.