diff --git a/go.mod b/go.mod index 81bf5ec660..de99c6ad8f 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 github.com/fatih/color v1.15.0 - github.com/go-co-op/gocron v1.33.1 + github.com/go-co-op/gocron v1.35.3 github.com/go-gormigrate/gormigrate/v2 v2.1.1 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/golang-jwt/jwt v3.2.2+incompatible @@ -82,7 +82,7 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-containerregistry v0.16.1 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.3.1 // indirect + github.com/google/uuid v1.4.0 // indirect github.com/gorilla/securecookie v1.1.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect diff --git a/go.sum b/go.sum index b6d8fa7d45..922e21a622 100644 --- a/go.sum +++ b/go.sum @@ -425,8 +425,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= -github.com/go-co-op/gocron v1.33.1 h1:wjX+Dg6Ae29a/f9BSQjY1Rl+jflTpW9aDyMqseCj78c= -github.com/go-co-op/gocron v1.33.1/go.mod h1:NLi+bkm4rRSy1F8U7iacZOz0xPseMoIOnvabGoSe/no= +github.com/go-co-op/gocron v1.35.3 h1:it2WjWnabS8eJZ+P68WroBe+ZWyJ3kVjRD6KXdpr5yI= +github.com/go-co-op/gocron v1.35.3/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= @@ -605,8 +605,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/vendor/github.com/go-co-op/gocron/README.md b/vendor/github.com/go-co-op/gocron/README.md index f4a227d6f8..33815c1935 100644 --- a/vendor/github.com/go-co-op/gocron/README.md +++ b/vendor/github.com/go-co-op/gocron/README.md @@ -1,5 +1,6 @@ # gocron: A Golang Job Scheduling Package. +[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#job-scheduler) [![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=main&event=push)](https://github.com/go-co-op/gocron/actions) ![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron) @@ -79,6 +80,13 @@ s.CronWithSeconds("*/1 * * * * *").Do(task) // every second s.StartAsync() // starts the scheduler and blocks current execution path s.StartBlocking() + +// stop the running scheduler in two different ways: +// stop the scheduler +s.Stop() + +// stop the scheduler and notify the `StartBlocking()` to exit +s.StopBlockingChan() ``` For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples) @@ -104,6 +112,7 @@ There are several options available to restrict how jobs run: | Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed | | Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler | | Distributed locking | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler | +| Distributed elector | `WithDistributedElector()` | multiple instances exist in a distributed scenario, only the leader instance can run jobs | ## Distributed Locker Implementations diff --git a/vendor/github.com/go-co-op/gocron/executor.go b/vendor/github.com/go-co-op/gocron/executor.go index 401f7fcb98..894468c4c6 100644 --- a/vendor/github.com/go-co-op/gocron/executor.go +++ b/vendor/github.com/go-co-op/gocron/executor.go @@ -32,7 +32,6 @@ const ( // // blocked trying to send to the buffered channel // time.Sleep(10 * time.Minute) // }) - WaitMode ) @@ -54,7 +53,8 @@ type executor struct { limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue - distributedLocker Locker // support running jobs across multiple instances + distributedLocker Locker // support running jobs across multiple instances + distributedElector Elector // support running jobs across multiple instances } func newExecutor() executor { @@ -128,7 +128,11 @@ func (e *executor) limitModeRunner() { return case jf := <-e.limitModeQueue: if !e.stopped.Load() { - e.runJob(jf) + select { + case <-jf.ctx.Done(): + default: + e.runJob(jf) + } } } } @@ -154,12 +158,25 @@ func (e *executor) start() { } func (e *executor) runJob(f jobFunction) { + defer func() { + if e.limitMode == RescheduleMode && e.limitModeMaxRunningJobs > 0 { + e.limitModeRunningJobs.Add(-1) + } + }() switch f.runConfig.mode { case defaultMode: lockKey := f.jobName if lockKey == "" { lockKey = f.funcName } + if e.distributedElector != nil { + err := e.distributedElector.IsLeader(e.ctx) + if err != nil { + return + } + runJob(f) + return + } if e.distributedLocker != nil { l, err := e.distributedLocker.Lock(f.ctx, lockKey) if err != nil || l == nil { @@ -170,8 +187,17 @@ func (e *executor) runJob(f jobFunction) { if durationToNextRun > time.Second*5 { durationToNextRun = time.Second * 5 } + + delay := time.Duration(float64(durationToNextRun) * 0.9) + if e.limitModeMaxRunningJobs > 0 { + time.AfterFunc(delay, func() { + _ = l.Unlock(f.ctx) + }) + return + } + if durationToNextRun > time.Millisecond*100 { - timer := time.NewTimer(time.Duration(float64(durationToNextRun) * 0.9)) + timer := time.NewTimer(delay) defer timer.Stop() select { @@ -181,6 +207,8 @@ func (e *executor) runJob(f jobFunction) { } _ = l.Unlock(f.ctx) }() + runJob(f) + return } runJob(f) case singletonMode: @@ -225,6 +253,7 @@ func (e *executor) run() { if e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) { select { case e.limitModeQueue <- f: + e.limitModeRunningJobs.Inc() case <-e.ctx.Done(): } } diff --git a/vendor/github.com/go-co-op/gocron/gocron.go b/vendor/github.com/go-co-op/gocron/gocron.go index 3203d5638c..def4383daf 100644 --- a/vendor/github.com/go-co-op/gocron/gocron.go +++ b/vendor/github.com/go-co-op/gocron/gocron.go @@ -136,9 +136,9 @@ func getFunctionNameOfPointer(fn interface{}) string { func parseTime(t string) (hour, min, sec int, err error) { var timeLayout string switch { - case timeWithSeconds.Match([]byte(t)): + case timeWithSeconds.MatchString(t): timeLayout = "15:04:05" - case timeWithoutSeconds.Match([]byte(t)): + case timeWithoutSeconds.MatchString(t): timeLayout = "15:04" default: return 0, 0, 0, ErrUnsupportedTimeFormat diff --git a/vendor/github.com/go-co-op/gocron/job.go b/vendor/github.com/go-co-op/gocron/job.go index 57fb81f753..13f0197937 100644 --- a/vendor/github.com/go-co-op/gocron/job.go +++ b/vendor/github.com/go-co-op/gocron/job.go @@ -177,6 +177,15 @@ func (j *Job) Name(name string) { j.jobName = name } +// GetName returns the name of the current job. +// The name is either the name set using Job.Name() / Scheduler.Name() or +// the name of the funcion as Go sees it, for example `main.func1` +func (j *Job) GetName() string { + j.mu.Lock() + defer j.mu.Unlock() + return j.jobFunction.getName() +} + func (j *Job) setRandomInterval(a, b int) { j.random.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint @@ -473,6 +482,9 @@ func (j *Job) Weekdays() []time.Weekday { if len(j.scheduledWeekdays) == 0 { return []time.Weekday{time.Sunday} } + sort.Slice(j.scheduledWeekdays, func(i, k int) bool { + return j.scheduledWeekdays[i] < j.scheduledWeekdays[k] + }) return j.scheduledWeekdays } diff --git a/vendor/github.com/go-co-op/gocron/locker.go b/vendor/github.com/go-co-op/gocron/locker.go index dc713f9b3c..193b20c70d 100644 --- a/vendor/github.com/go-co-op/gocron/locker.go +++ b/vendor/github.com/go-co-op/gocron/locker.go @@ -21,3 +21,10 @@ type Locker interface { type Lock interface { Unlock(ctx context.Context) error } + +// Elector determines the leader from instances asking to be the leader. Only +// the leader runs jobs. If the leader goes down, a new leader will be elected. +type Elector interface { + // IsLeader should return an error if the job should not be scheduled and nil if the job should be scheduled. + IsLeader(ctx context.Context) error +} diff --git a/vendor/github.com/go-co-op/gocron/scheduler.go b/vendor/github.com/go-co-op/gocron/scheduler.go index d011f61bc2..f51e0bde82 100644 --- a/vendor/github.com/go-co-op/gocron/scheduler.go +++ b/vendor/github.com/go-co-op/gocron/scheduler.go @@ -104,11 +104,13 @@ func (s *Scheduler) StartAsync() { func (s *Scheduler) start() { s.executor.start() s.setRunning(true) - s.runJobs(s.jobsMap()) + s.runJobs() } -func (s *Scheduler) runJobs(jobs map[uuid.UUID]*Job) { - for _, job := range jobs { +func (s *Scheduler) runJobs() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { ctx, cancel := context.WithCancel(context.Background()) job.mu.Lock() job.ctx = ctx @@ -142,13 +144,13 @@ func (s *Scheduler) Jobs() []*Job { // JobsMap returns a map of job uuid to job func (s *Scheduler) JobsMap() map[uuid.UUID]*Job { - return s.jobsMap() -} - -func (s *Scheduler) jobsMap() map[uuid.UUID]*Job { s.jobsMutex.RLock() defer s.jobsMutex.RUnlock() - return s.jobs + jobs := make(map[uuid.UUID]*Job, len(s.jobs)) + for id, job := range s.jobs { + jobs[id] = job + } + return jobs } // Name sets the name of the current job. @@ -161,12 +163,6 @@ func (s *Scheduler) Name(name string) *Scheduler { return s } -func (s *Scheduler) setJobs(jobs map[uuid.UUID]*Job) { - s.jobsMutex.Lock() - defer s.jobsMutex.Unlock() - s.jobs = jobs -} - // Len returns the number of Jobs in the Scheduler func (s *Scheduler) Len() int { s.jobsMutex.RLock() @@ -225,7 +221,7 @@ func (s *Scheduler) scheduleNextRun(job *Job) (bool, nextRun) { } if !job.shouldRun() { - s.RemoveByReference(job) + _ = s.RemoveByID(job) return false, nextRun{} } @@ -397,8 +393,8 @@ func (s *Scheduler) calculateWeeks(job *Job, lastRun time.Time) nextRun { func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekday int, job *Job) int { if job.getInterval() > 1 { - // just count weeks after the first jobs were done - if job.RunCount() < len(job.Weekdays()) { + weekDays := job.Weekdays() + if job.lastRun.Weekday() != weekDays[len(weekDays)-1] { return daysToWeekday } if daysToWeekday > 0 { @@ -508,21 +504,23 @@ func (s *Scheduler) roundToMidnightAndAddDSTAware(t time.Time, d time.Duration) // NextRun datetime when the next Job should run. func (s *Scheduler) NextRun() (*Job, time.Time) { - if len(s.jobsMap()) <= 0 { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + if len(s.jobs) <= 0 { return nil, time.Time{} } var jobID uuid.UUID var nearestRun time.Time - for _, job := range s.jobsMap() { + for _, job := range s.jobs { nr := job.NextRun() - if nr.Before(nearestRun) && s.now().Before(nr) { + if (nr.Before(nearestRun) || nearestRun.IsZero()) && s.now().Before(nr) { nearestRun = nr jobID = job.id } } - return s.jobsMap()[jobID], nearestRun + return s.jobs[jobID], nearestRun } // EveryRandom schedules a new period Job that runs at random intervals @@ -539,6 +537,7 @@ func (s *Scheduler) EveryRandom(lower, upper int) *Scheduler { // Every schedules a new periodic Job with an interval. // Interval can be an int, time.Duration or a string that // parses with time.ParseDuration(). +// Negative intervals will return an error. // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". // // The job is run immediately, unless: @@ -555,6 +554,9 @@ func (s *Scheduler) Every(interval interface{}) *Scheduler { job.error = wrapOrError(job.error, ErrInvalidInterval) } case time.Duration: + if interval <= 0 { + job.error = wrapOrError(job.error, ErrInvalidInterval) + } job.setInterval(0) job.setDuration(interval) job.setUnit(duration) @@ -563,6 +565,9 @@ func (s *Scheduler) Every(interval interface{}) *Scheduler { if err != nil { job.error = wrapOrError(job.error, err) } + if d <= 0 { + job.error = wrapOrError(job.error, ErrInvalidInterval) + } job.setDuration(d) job.setUnit(duration) default: @@ -648,7 +653,9 @@ func (s *Scheduler) RunAll() { // RunAllWithDelay runs all Jobs with the provided delay in between each Job func (s *Scheduler) RunAllWithDelay(d time.Duration) { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { s.run(job) s.time.Sleep(d) } @@ -696,16 +703,13 @@ func (s *Scheduler) Remove(job interface{}) { // RemoveByReference removes specific Job by reference func (s *Scheduler) RemoveByReference(job *Job) { - s.removeJobsUniqueTags(job) - s.removeByCondition(func(someJob *Job) bool { - job.mu.RLock() - defer job.mu.RUnlock() - return someJob == job - }) + _ = s.RemoveByID(job) } func (s *Scheduler) findJobByTaskName(name string) *Job { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { if job.funcName == name { return job } @@ -725,15 +729,23 @@ func (s *Scheduler) removeJobsUniqueTags(job *Job) { } func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) { - retainedJobs := make(map[uuid.UUID]*Job, 0) - for _, job := range s.jobsMap() { - if !shouldRemove(job) { - retainedJobs[job.id] = job - } else { - job.stop() + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + for _, job := range s.jobs { + if shouldRemove(job) { + s.stopJob(job) + delete(s.jobs, job.id) } } - s.setJobs(retainedJobs) +} + +func (s *Scheduler) stopJob(job *Job) { + job.mu.Lock() + if job.runConfig.mode == singletonMode { + s.executor.singletonWgs.Delete(job.singletonWg) + } + job.mu.Unlock() + job.stop() } // RemoveByTag will remove jobs that match the given tag. @@ -749,7 +761,7 @@ func (s *Scheduler) RemoveByTags(tags ...string) error { } for _, job := range jobs { - s.RemoveByReference(job) + _ = s.RemoveByID(job) } return nil } @@ -769,7 +781,7 @@ func (s *Scheduler) RemoveByTagsAny(tags ...string) error { } for job := range mJob { - s.RemoveByReference(job) + _ = s.RemoveByID(job) } return errs @@ -777,8 +789,12 @@ func (s *Scheduler) RemoveByTagsAny(tags ...string) error { // RemoveByID removes the job from the scheduler looking up by id func (s *Scheduler) RemoveByID(job *Job) error { - if _, ok := s.jobsMap()[job.id]; ok { - delete(s.jobsMap(), job.id) + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + if _, ok := s.jobs[job.id]; ok { + s.removeJobsUniqueTags(job) + s.stopJob(job) + delete(s.jobs, job.id) return nil } return ErrJobNotFound @@ -788,8 +804,10 @@ func (s *Scheduler) RemoveByID(job *Job) error { func (s *Scheduler) FindJobsByTag(tags ...string) ([]*Job, error) { var jobs []*Job + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() Jobs: - for _, job := range s.jobsMap() { + for _, job := range s.jobs { if job.hasTags(tags...) { jobs = append(jobs, job) continue Jobs @@ -860,7 +878,9 @@ func (s *Scheduler) SingletonModeAll() { // TaskPresent checks if specific job's function was added to the scheduler. func (s *Scheduler) TaskPresent(j interface{}) bool { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { if job.funcName == getFunctionName(j) { return true } @@ -868,25 +888,21 @@ func (s *Scheduler) TaskPresent(j interface{}) bool { return false } -// To avoid the recursive read lock on s.jobsMap() and this function, -// creating this new function and distributing the lock between jobPresent, _jobPresent -func (s *Scheduler) _jobPresent(j *Job, jobs map[uuid.UUID]*Job) bool { +func (s *Scheduler) jobPresent(j *Job) bool { s.jobsMutex.RLock() defer s.jobsMutex.RUnlock() - if _, ok := jobs[j.id]; ok { + if _, ok := s.jobs[j.id]; ok { return true } return false } -func (s *Scheduler) jobPresent(j *Job) bool { - return s._jobPresent(j, s.jobsMap()) -} - // Clear clears all Jobs from this scheduler func (s *Scheduler) Clear() { s.stopJobs() - s.setJobs(make(map[uuid.UUID]*Job, 0)) + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + s.jobs = make(map[uuid.UUID]*Job) // If unique tags was enabled, delete all the tags loaded in the tags sync.Map if s.tagsUnique { s.tags.Range(func(key interface{}, value interface{}) bool { @@ -942,7 +958,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e if job.error != nil { // delete the job from the scheduler as this job // cannot be executed - s.RemoveByReference(job) + _ = s.RemoveByID(job) return nil, job.error } @@ -953,7 +969,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e if val.Kind() != reflect.Func { // delete the job for the same reason as above - s.RemoveByReference(job) + _ = s.RemoveByID(job) return nil, ErrNotAFunction } @@ -980,13 +996,13 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e } if len(params) != expectedParamLength { - s.RemoveByReference(job) + _ = s.RemoveByID(job) job.error = wrapOrError(job.error, ErrWrongParams) return nil, job.error } if job.runWithDetails && val.Type().In(len(params)).Kind() != reflect.ValueOf(*job).Kind() { - s.RemoveByReference(job) + _ = s.RemoveByID(job) job.error = wrapOrError(job.error, ErrDoWithJobDetails) return nil, job.error } @@ -1066,7 +1082,9 @@ func (s *Scheduler) Tag(t ...string) *Scheduler { // GetAllTags returns all tags. func (s *Scheduler) GetAllTags() []string { var tags []string - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { tags = append(tags, job.Tags()...) } return tags @@ -1092,12 +1110,12 @@ func (s *Scheduler) setUnit(unit schedulingUnit) { job.setUnit(unit) } -// Millisecond sets the unit with seconds +// Millisecond sets the unit with milliseconds func (s *Scheduler) Millisecond() *Scheduler { return s.Milliseconds() } -// Milliseconds sets the unit with seconds +// Milliseconds sets the unit with milliseconds func (s *Scheduler) Milliseconds() *Scheduler { s.setUnit(milliseconds) return s @@ -1465,9 +1483,6 @@ func (s *Scheduler) StopBlockingChan() { // WithDistributedLocker prevents the same job from being run more than once // when multiple schedulers are trying to schedule the same job. // -// NOTE - This is currently in BETA. Please provide any feedback on your usage -// and open bugs with any issues. -// // One strategy to reduce splay in the job execution times when using // intervals (e.g. 1s, 1m, 1h), on each scheduler instance, is to use // StartAt with time.Now().Round(interval) to start the job at the @@ -1487,13 +1502,27 @@ func (s *Scheduler) WithDistributedLocker(l Locker) { s.executor.distributedLocker = l } +// WithDistributedElector prevents the same job from being run more than once +// when multiple schedulers are trying to schedule the same job, by allowing only +// the leader to run jobs. Non-leaders wait until the leader instance goes down +// and then a new leader is elected. +// +// Compared with the distributed lock, the election is the same as leader/follower framework. +// All jobs are only scheduled and execute on the leader scheduler instance. Only when the leader scheduler goes down +// and one of the scheduler instances is successfully elected, then the new leader scheduler instance can schedule jobs. +func (s *Scheduler) WithDistributedElector(e Elector) { + s.executor.distributedElector = e +} + // RegisterEventListeners accepts EventListeners and registers them for all jobs // in the scheduler at the time this function is called. // The event listeners are then called at the times described by each listener. // If a new job is added, an additional call to this method, or the job specific // version must be executed in order for the new job to trigger event listeners. func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) { - for _, job := range s.jobsMap() { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + for _, job := range s.jobs { job.RegisterEventListeners(eventListeners...) } } diff --git a/vendor/github.com/google/uuid/CHANGELOG.md b/vendor/github.com/google/uuid/CHANGELOG.md index 2bd78667af..7ed347d3ad 100644 --- a/vendor/github.com/google/uuid/CHANGELOG.md +++ b/vendor/github.com/google/uuid/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## [1.4.0](https://github.com/google/uuid/compare/v1.3.1...v1.4.0) (2023-10-26) + + +### Features + +* UUIDs slice type with Strings() convenience method ([#133](https://github.com/google/uuid/issues/133)) ([cd5fbbd](https://github.com/google/uuid/commit/cd5fbbdd02f3e3467ac18940e07e062be1f864b4)) + +### Fixes + +* Clarify that Parse's job is to parse but not necessarily validate strings. (Documents current behavior) + ## [1.3.1](https://github.com/google/uuid/compare/v1.3.0...v1.3.1) (2023-08-18) diff --git a/vendor/github.com/google/uuid/CONTRIBUTING.md b/vendor/github.com/google/uuid/CONTRIBUTING.md index 5566888726..a502fdc515 100644 --- a/vendor/github.com/google/uuid/CONTRIBUTING.md +++ b/vendor/github.com/google/uuid/CONTRIBUTING.md @@ -11,7 +11,7 @@ please explain why in the pull request description. ### Releasing -Commits that would precipitate a SemVer change, as desrcibed in the Conventional +Commits that would precipitate a SemVer change, as described in the Conventional Commits Specification, will trigger [`release-please`](https://github.com/google-github-actions/release-please-action) to create a release candidate pull request. Once submitted, `release-please` will create a release. diff --git a/vendor/github.com/google/uuid/uuid.go b/vendor/github.com/google/uuid/uuid.go index a56138cc4b..dc75f7d990 100644 --- a/vendor/github.com/google/uuid/uuid.go +++ b/vendor/github.com/google/uuid/uuid.go @@ -56,11 +56,15 @@ func IsInvalidLengthError(err error) bool { return ok } -// Parse decodes s into a UUID or returns an error. Both the standard UUID -// forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and -// urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the -// Microsoft encoding {xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx} and the raw hex -// encoding: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx. +// Parse decodes s into a UUID or returns an error if it cannot be parsed. Both +// the standard UUID forms defined in RFC 4122 +// (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and +// urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) are decoded. In addition, +// Parse accepts non-standard strings such as the raw hex encoding +// xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx and 38 byte "Microsoft style" encodings, +// e.g. {xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx}. Only the middle 36 bytes are +// examined in the latter case. Parse should not be used to validate strings as +// it parses non-standard encodings as indicated above. func Parse(s string) (UUID, error) { var uuid UUID switch len(s) { @@ -294,3 +298,15 @@ func DisableRandPool() { poolMu.Lock() poolPos = randPoolSize } + +// UUIDs is a slice of UUID types. +type UUIDs []UUID + +// Strings returns a string slice containing the string form of each UUID in uuids. +func (uuids UUIDs) Strings() []string { + var uuidStrs = make([]string, len(uuids)) + for i, uuid := range uuids { + uuidStrs[i] = uuid.String() + } + return uuidStrs +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4db96e4f64..54c8371fa4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -123,7 +123,7 @@ github.com/fsnotify/fsnotify # github.com/go-chi/chi/v5 v5.0.10 ## explicit; go 1.14 github.com/go-chi/chi/v5 -# github.com/go-co-op/gocron v1.33.1 +# github.com/go-co-op/gocron v1.35.3 ## explicit; go 1.16 github.com/go-co-op/gocron # github.com/go-faster/city v1.0.1 @@ -220,7 +220,7 @@ github.com/google/go-containerregistry/pkg/v1/types ## explicit; go 1.12 github.com/google/gofuzz github.com/google/gofuzz/bytesource -# github.com/google/uuid v1.3.1 +# github.com/google/uuid v1.4.0 ## explicit github.com/google/uuid # github.com/gorilla/handlers v1.5.1