From 838bd518a0974276dc0a9b2b4acfd9a97dc354ba Mon Sep 17 00:00:00 2001 From: Rodrigo Broggi Date: Thu, 19 Sep 2024 15:18:50 +0200 Subject: [PATCH] Issue 778 (#779) --- job.go | 12 +++++++++++ scheduler_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/job.go b/job.go index b3fbb8ea..ee206af1 100644 --- a/job.go +++ b/job.go @@ -459,6 +459,8 @@ type oneTimeJobDefinition struct { func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { sortedTimes := o.startAt(j) slices.SortStableFunc(sortedTimes, ascendingTime) + // deduplicate the times + sortedTimes = removeSliceDuplicatesTimeOnSortedSlice(sortedTimes) // keep only schedules that are in the future idx, found := slices.BinarySearchFunc(sortedTimes, now, ascendingTime) if found { @@ -472,6 +474,16 @@ func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.T return nil } +func removeSliceDuplicatesTimeOnSortedSlice(times []time.Time) []time.Time { + ret := make([]time.Time, 0, len(times)) + for i, t := range times { + if i == 0 || t != times[i-1] { + ret = append(ret, t) + } + } + return ret +} + // OneTimeJobStartAtOption defines when the one time job is run type OneTimeJobStartAtOption func(*internalJob) []time.Time diff --git a/scheduler_test.go b/scheduler_test.go index 70aed72c..a8c719b1 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2325,6 +2325,59 @@ func TestScheduler_AtTimesJob(t *testing.T) { }, }, }, + + { + name: "two runs in the future - order is maintained even if times are provided out of order - deduplication", + atTimes: []time.Time{n.Add(3 * time.Millisecond), n.Add(1 * time.Millisecond), n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(3*time.Millisecond), nextRunAt) + }, + + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(2), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, } for _, tt := range tests {