From aabed7324ead92c00d60f08819ec01e43324545f Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 8 Oct 2024 15:02:21 +0100 Subject: [PATCH] Correctly prune staging queue duing scheduling events, and Deletion Signed-off-by: joshvanl --- cron/cron_test.go | 547 ------------------------------ internal/queue/queue.go | 47 ++- internal/queue/staging.go | 11 +- internal/queue/staging_test.go | 12 +- tests/suite/failurepolicy_test.go | 4 +- tests/suite/jobwithspace_test.go | 56 +++ tests/suite/undeliverable_test.go | 182 +++++++++- 7 files changed, 280 insertions(+), 579 deletions(-) create mode 100644 tests/suite/jobwithspace_test.go diff --git a/cron/cron_test.go b/cron/cron_test.go index d99186f..75ff376 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -74,550 +74,3 @@ func Test_Run(t *testing.T) { } }) } - -func Test_zeroDueTime(t *testing.T) { - t.Parallel() - - helper := testCron(t, 1) - - require.NoError(t, helper.api.Add(helper.ctx, "yoyo", &api.Job{ - Schedule: ptr.Of("@every 1h"), - DueTime: ptr.Of("0s"), - })) - assert.Eventually(t, func() bool { - return helper.triggered.Load() == 1 - }, 3*time.Second, time.Millisecond*10) - - require.NoError(t, helper.api.Add(helper.ctx, "yoyo2", &api.Job{ - Schedule: ptr.Of("@every 1h"), - DueTime: ptr.Of("1s"), - })) - assert.Eventually(t, func() bool { - return helper.triggered.Load() == 2 - }, 3*time.Second, time.Millisecond*10) - - require.NoError(t, helper.api.Add(helper.ctx, "yoyo3", &api.Job{ - Schedule: ptr.Of("@every 1h"), - })) - <-time.After(2 * time.Second) - assert.Equal(t, int64(2), helper.triggered.Load()) -} - -func Test_parallel(t *testing.T) { - t.Parallel() - - for _, test := range []struct { - name string - total uint32 - }{ - {"1 queue", 1}, - {"multi queue", 50}, - } { - total := test.total - t.Run(test.name, func(t *testing.T) { - t.Parallel() - - releaseCh := make(chan struct{}) - var waiting atomic.Int32 - var done atomic.Int32 - helper := testCronWithOptions(t, testCronOptions{ - total: total, - triggerFn: func(*api.TriggerRequest) bool { - waiting.Add(1) - <-releaseCh - done.Add(1) - return true - }, - }) - - for i := range 100 { - require.NoError(t, helper.api.Add(helper.ctx, strconv.Itoa(i), &api.Job{ - DueTime: ptr.Of("0s"), - })) - } - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int32(100), waiting.Load()) - }, 5*time.Second, 10*time.Millisecond) - close(releaseCh) - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int32(100), done.Load()) - }, 5*time.Second, 10*time.Millisecond) - }) - } -} - -func Test_schedule(t *testing.T) { - t.Parallel() - - t.Run("if no counter, job should not be deleted and no counter created", func(t *testing.T) { - t.Parallel() - - client := tests.EmbeddedETCDBareClient(t) - - now := time.Now().UTC() - jobBytes1, err := proto.Marshal(&stored.Job{ - Begin: &stored.Job_DueTime{DueTime: timestamppb.New(now.Add(time.Hour))}, - PartitionId: 123, - Job: &api.Job{DueTime: ptr.Of(now.Add(time.Hour).Format(time.RFC3339))}, - }) - require.NoError(t, err) - _, err = client.Put(context.Background(), "abc/jobs/1", string(jobBytes1)) - require.NoError(t, err) - - jobBytes2, err := proto.Marshal(&stored.Job{ - Begin: &stored.Job_DueTime{DueTime: timestamppb.New(now)}, - PartitionId: 123, - Job: &api.Job{DueTime: ptr.Of(now.Format(time.RFC3339))}, - }) - require.NoError(t, err) - _, err = client.Put(context.Background(), "abc/jobs/2", string(jobBytes2)) - require.NoError(t, err) - - resp, err := client.Get(context.Background(), "abc/jobs", clientv3.WithPrefix()) - require.NoError(t, err) - assert.Len(t, resp.Kvs, 2) - - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: client, - }) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int64(1), cron.triggered.Load()) - }, 5*time.Second, 10*time.Millisecond) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err = client.Get(context.Background(), "abc/jobs", clientv3.WithPrefix()) - require.NoError(t, err) - assert.Len(c, resp.Kvs, 1) - }, 5*time.Second, 10*time.Millisecond) - - cron.closeCron() - - resp, err = client.Get(context.Background(), "abc/jobs/1") - require.NoError(t, err) - require.Len(t, resp.Kvs, 1) - assert.Equal(t, string(jobBytes1), string(resp.Kvs[0].Value)) - - resp, err = client.Get(context.Background(), "abc/counters", clientv3.WithPrefix()) - require.NoError(t, err) - require.Empty(t, resp.Kvs) - - assert.Equal(t, int64(1), cron.triggered.Load()) - }) - - t.Run("if schedule is not done, job and counter should not be deleted", func(t *testing.T) { - t.Parallel() - - client := tests.EmbeddedETCDBareClient(t) - - future := time.Now().UTC().Add(time.Hour) - jobBytes, err := proto.Marshal(&stored.Job{ - Begin: &stored.Job_DueTime{ - DueTime: timestamppb.New(future), - }, - PartitionId: 123, - Job: &api.Job{ - DueTime: ptr.Of(future.Format(time.RFC3339)), - }, - }) - require.NoError(t, err) - counterBytes, err := proto.Marshal(&stored.Counter{ - LastTrigger: nil, - Count: 0, - JobPartitionId: 123, - }) - require.NoError(t, err) - - _, err = client.Put(context.Background(), "abc/jobs/1", string(jobBytes)) - require.NoError(t, err) - _, err = client.Put(context.Background(), "abc/counters/1", string(counterBytes)) - require.NoError(t, err) - - now := time.Now().UTC() - jobBytes2, err := proto.Marshal(&stored.Job{ - Begin: &stored.Job_DueTime{DueTime: timestamppb.New(now)}, - Job: &api.Job{DueTime: ptr.Of(now.Format(time.RFC3339))}, - }) - require.NoError(t, err) - _, err = client.Put(context.Background(), "abc/jobs/2", string(jobBytes2)) - require.NoError(t, err) - - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: client, - }) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int64(1), cron.triggered.Load()) - }, 5*time.Second, 10*time.Millisecond) - - resp, err := client.Get(context.Background(), "abc/jobs/1") - require.NoError(t, err) - require.Len(t, resp.Kvs, 1) - assert.Equal(t, string(jobBytes), string(resp.Kvs[0].Value)) - - resp, err = client.Get(context.Background(), "abc/counters/1") - require.NoError(t, err) - require.Len(t, resp.Kvs, 1) - assert.Equal(t, string(counterBytes), string(resp.Kvs[0].Value)) - - resp, err = client.Get(context.Background(), "abc/jobs", clientv3.WithPrefix()) - require.NoError(t, err) - assert.Len(t, resp.Kvs, 1) - }) - - t.Run("if schedule is done, expect job and counter to be deleted", func(t *testing.T) { - t.Parallel() - - client := tests.EmbeddedETCDBareClient(t) - - now := time.Now().UTC() - jobBytes, err := proto.Marshal(&stored.Job{ - Begin: &stored.Job_DueTime{ - DueTime: timestamppb.New(now), - }, - PartitionId: 123, - Job: &api.Job{ - DueTime: ptr.Of(now.Format(time.RFC3339)), - }, - }) - require.NoError(t, err) - counterBytes, err := proto.Marshal(&stored.Counter{ - LastTrigger: timestamppb.New(now), - Count: 1, - JobPartitionId: 123, - }) - require.NoError(t, err) - - _, err = client.Put(context.Background(), "abc/jobs/1", string(jobBytes)) - require.NoError(t, err) - _, err = client.Put(context.Background(), "abc/counters/1", string(counterBytes)) - require.NoError(t, err) - - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: client, - }) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err := client.Get(context.Background(), "abc/jobs/1") - require.NoError(t, err) - assert.Empty(c, resp.Kvs) - resp, err = client.Get(context.Background(), "abc/counters/1") - require.NoError(t, err) - assert.Empty(c, resp.Kvs) - }, 5*time.Second, 10*time.Millisecond) - - assert.Equal(t, int64(0), cron.triggered.Load()) - }) -} - -func Test_jobWithSpace(t *testing.T) { - t.Parallel() - - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: tests.EmbeddedETCDBareClient(t), - }) - - require.NoError(t, cron.api.Add(context.Background(), "hello world", &api.Job{ - DueTime: ptr.Of(time.Now().Add(2).Format(time.RFC3339)), - })) - resp, err := cron.api.Get(context.Background(), "hello world") - require.NoError(t, err) - assert.NotNil(t, resp) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int64(1), cron.triggered.Load()) - resp, err = cron.api.Get(context.Background(), "hello world") - assert.NoError(c, err) - assert.Nil(c, resp) - }, time.Second*10, time.Millisecond*10) - - require.NoError(t, cron.api.Add(context.Background(), "another hello world", &api.Job{ - Schedule: ptr.Of("@every 1s"), - })) - resp, err = cron.api.Get(context.Background(), "another hello world") - require.NoError(t, err) - assert.NotNil(t, resp) - listresp, err := cron.api.List(context.Background(), "") - require.NoError(t, err) - assert.Len(t, listresp.GetJobs(), 1) - require.NoError(t, cron.api.Delete(context.Background(), "another hello world")) - resp, err = cron.api.Get(context.Background(), "another hello world") - require.NoError(t, err) - assert.Nil(t, resp) - listresp, err = cron.api.List(context.Background(), "") - require.NoError(t, err) - assert.Empty(t, listresp.GetJobs()) -} - -func Test_FailurePolicy(t *testing.T) { - t.Parallel() - - t.Run("default policy should retry 3 times with a 1sec interval", func(t *testing.T) { - t.Parallel() - - gotCh := make(chan *api.TriggerRequest, 1) - var got atomic.Uint32 - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: tests.EmbeddedETCDBareClient(t), - triggerFn: func(*api.TriggerRequest) bool { - assert.GreaterOrEqual(t, uint32(8), got.Add(1)) - return false - }, - gotCh: gotCh, - }) - - require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ - DueTime: ptr.Of(time.Now().Format(time.RFC3339)), - Schedule: ptr.Of("@every 1s"), - Repeats: ptr.Of(uint32(2)), - })) - - for range 8 { - resp, err := cron.api.Get(context.Background(), "test") - require.NoError(t, err) - assert.NotNil(t, resp) - select { - case <-gotCh: - case <-time.After(time.Second * 3): - assert.Fail(t, "timeout waiting for trigger") - } - } - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err := cron.api.Get(context.Background(), "test") - assert.NoError(c, err) - assert.Nil(c, resp) - }, time.Second*5, time.Millisecond*10) - }) - - t.Run("drop policy should not retry triggering", func(t *testing.T) { - t.Parallel() - - gotCh := make(chan *api.TriggerRequest, 1) - var got atomic.Uint32 - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: tests.EmbeddedETCDBareClient(t), - triggerFn: func(*api.TriggerRequest) bool { - assert.GreaterOrEqual(t, uint32(2), got.Add(1)) - return false - }, - gotCh: gotCh, - }) - - require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ - DueTime: ptr.Of(time.Now().Format(time.RFC3339)), - Schedule: ptr.Of("@every 1s"), - Repeats: ptr.Of(uint32(2)), - FailurePolicy: &api.FailurePolicy{ - Policy: new(api.FailurePolicy_Drop), - }, - })) - - for range 2 { - resp, err := cron.api.Get(context.Background(), "test") - require.NoError(t, err) - assert.NotNil(t, resp) - select { - case <-gotCh: - case <-time.After(time.Second * 3): - assert.Fail(t, "timeout waiting for trigger") - } - } - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err := cron.api.Get(context.Background(), "test") - assert.NoError(c, err) - assert.Nil(c, resp) - }, time.Second*5, time.Millisecond*10) - }) - - t.Run("constant policy should only retry when it fails ", func(t *testing.T) { - t.Parallel() - - gotCh := make(chan *api.TriggerRequest, 1) - var got atomic.Uint32 - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: tests.EmbeddedETCDBareClient(t), - triggerFn: func(*api.TriggerRequest) bool { - assert.GreaterOrEqual(t, uint32(5), got.Add(1)) - return got.Load() == 3 - }, - gotCh: gotCh, - }) - - require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ - DueTime: ptr.Of(time.Now().Format(time.RFC3339)), - Schedule: ptr.Of("@every 1s"), - Repeats: ptr.Of(uint32(3)), - FailurePolicy: &api.FailurePolicy{ - Policy: &api.FailurePolicy_Constant{ - Constant: &api.FailurePolicyConstant{ - Interval: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)), - }, - }, - }, - })) - - for range 5 { - resp, err := cron.api.Get(context.Background(), "test") - require.NoError(t, err) - assert.NotNil(t, resp) - select { - case <-gotCh: - case <-time.After(time.Second * 3): - assert.Fail(t, "timeout waiting for trigger") - } - } - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err := cron.api.Get(context.Background(), "test") - assert.NoError(c, err) - assert.Nil(c, resp) - }, time.Second*5, time.Millisecond*10) - }) - - t.Run("constant policy can retry forever until it succeeds", func(t *testing.T) { - t.Parallel() - - gotCh := make(chan *api.TriggerRequest, 1) - var got atomic.Uint32 - cron := testCronWithOptions(t, testCronOptions{ - total: 1, - client: tests.EmbeddedETCDBareClient(t), - triggerFn: func(*api.TriggerRequest) bool { - assert.GreaterOrEqual(t, uint32(100), got.Add(1)) - return got.Load() == 100 - }, - gotCh: gotCh, - }) - - require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ - DueTime: ptr.Of(time.Now().Format(time.RFC3339)), - FailurePolicy: &api.FailurePolicy{ - Policy: &api.FailurePolicy_Constant{ - Constant: &api.FailurePolicyConstant{ - Interval: durationpb.New(time.Millisecond), - }, - }, - }, - })) - - for range 100 { - resp, err := cron.api.Get(context.Background(), "test") - require.NoError(t, err) - assert.NotNil(t, resp) - select { - case <-gotCh: - case <-time.After(time.Second * 3): - assert.Fail(t, "timeout waiting for trigger") - } - } - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - resp, err := cron.api.Get(context.Background(), "test") - assert.NoError(c, err) - assert.Nil(c, resp) - }, time.Second*5, time.Millisecond*10) - }) -} - -type testCronOptions struct { - total uint32 - gotCh chan *api.TriggerRequest - triggerFn func(*api.TriggerRequest) bool - client *clientv3.Client -} - -type helper struct { - ctx context.Context - closeCron func() - client client.Interface - api api.Interface - allCrons []api.Interface - triggered *atomic.Int64 -} - -func testCron(t *testing.T, total uint32) *helper { - t.Helper() - return testCronWithOptions(t, testCronOptions{ - total: total, - }) -} - -func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { - t.Helper() - - require.Positive(t, opts.total) - cl := opts.client - if cl == nil { - cl = tests.EmbeddedETCDBareClient(t) - } - - var triggered atomic.Int64 - var a api.Interface - allCrns := make([]api.Interface, opts.total) - for i := range opts.total { - c, err := New(Options{ - Log: logr.Discard(), - Client: cl, - Namespace: "abc", - PartitionID: i, - PartitionTotal: opts.total, - TriggerFn: func(_ context.Context, req *api.TriggerRequest) bool { - defer func() { triggered.Add(1) }() - if opts.gotCh != nil { - opts.gotCh <- req - } - if opts.triggerFn != nil { - return opts.triggerFn(req) - } - return true - }, - - CounterGarbageCollectionInterval: ptr.Of(time.Millisecond * 300), - }) - require.NoError(t, err) - allCrns[i] = c - if i == 0 { - a = c - } - } - - errCh := make(chan error, opts.total) - ctx, cancel := context.WithCancel(context.Background()) - - closeOnce := sync.OnceFunc(func() { - cancel() - for range opts.total { - select { - case err := <-errCh: - require.NoError(t, err) - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for cron to stop") - } - } - }) - t.Cleanup(closeOnce) - for i := range opts.total { - go func(i uint32) { - errCh <- allCrns[i].Run(ctx) - }(i) - } - - return &helper{ - ctx: ctx, - client: client.New(client.Options{Client: cl, Log: logr.Discard()}), - api: a, - allCrons: allCrns, - triggered: &triggered, - closeCron: closeOnce, - } -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index b5d2e1b..1c31984 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "sync/atomic" @@ -85,9 +86,8 @@ type Queue struct { // staged are the counters that have been staged for later triggering as the // consumer has signalled that the job is current undeliverable. When the // consumer signals a prefix has become deliverable, counters in that prefix - // will be enqueued. - // TODO: @joshvanl: investigate better indexing for prefixing. - staged []counter.Interface + // will be enqueued. Indexed by the counters Job Name. + staged map[string]counter.Interface stagedLock sync.Mutex // activeConsumerPrefixes tracks the job name prefixes which are currently @@ -117,6 +117,7 @@ func New(opts Options) *Queue { yard: opts.Yard, clock: cl, deliverablePrefixes: make(map[string]*atomic.Int32), + staged: make(map[string]counter.Interface), lock: concurrency.NewMutexMap[string](), readyCh: make(chan struct{}), } @@ -169,21 +170,17 @@ func (q *Queue) Delete(ctx context.Context, jobName string) error { key := q.key.JobKey(jobName) q.lock.Lock(key) - q.stagedLock.Lock() defer q.lock.DeleteUnlock(key) - defer q.stagedLock.Unlock() + + q.stagedLock.Lock() + delete(q.staged, jobName) + q.stagedLock.Unlock() if _, err := q.client.Delete(ctx, key); err != nil { return err } if _, ok := q.cache.LoadAndDelete(key); ok { - for i, counter := range q.staged { - if counter.JobName() == jobName { - q.staged = append(q.staged[:i], q.staged[i+1:]...) - break - } - } q.queue.Dequeue(key) } @@ -209,8 +206,16 @@ func (q *Queue) DeletePrefixes(ctx context.Context, prefixes ...string) error { } for _, kv := range resp.PrevKvs { - errs = append(errs, q.cacheDelete(string(kv.Key))) + q.cacheDelete(string(kv.Key)) + } + + q.stagedLock.Lock() + for jobName := range q.staged { + if strings.HasPrefix(jobName, prefix) { + delete(q.staged, jobName) + } } + q.stagedLock.Unlock() } return errors.Join(errs...) @@ -237,30 +242,36 @@ func (q *Queue) HandleInformerEvent(ctx context.Context, e *informer.Event) erro func (q *Queue) scheduleEvent(ctx context.Context, e *informer.Event) error { q.lock.Lock(string(e.Key)) + + jobName := q.key.JobName(e.Key) + + q.stagedLock.Lock() + delete(q.staged, jobName) + q.stagedLock.Unlock() + if e.IsPut { defer q.lock.Unlock(string(e.Key)) - return q.schedule(ctx, q.key.JobName(e.Key), e.Job) + return q.schedule(ctx, jobName, e.Job) } defer q.lock.DeleteUnlock(string(e.Key)) q.cache.Delete(string(e.Key)) - q.collector.Push(q.key.CounterKey(q.key.JobName(e.Key))) + q.collector.Push(q.key.CounterKey(jobName)) q.queue.Dequeue(string(e.Key)) + return nil } -func (q *Queue) cacheDelete(jobKey string) error { +func (q *Queue) cacheDelete(jobKey string) { q.lock.Lock(jobKey) defer q.lock.DeleteUnlock(jobKey) if _, ok := q.cache.Load(jobKey); !ok { - return nil + return } q.cache.Delete(jobKey) q.queue.Dequeue(jobKey) - - return nil } // handleTrigger handles triggering a schedule job. diff --git a/internal/queue/staging.go b/internal/queue/staging.go index 6375789..5ef160c 100644 --- a/internal/queue/staging.go +++ b/internal/queue/staging.go @@ -28,11 +28,10 @@ func (q *Queue) DeliverablePrefixes(prefixes ...string) context.CancelFunc { if _, ok := q.deliverablePrefixes[prefix]; !ok { q.deliverablePrefixes[prefix] = new(atomic.Int32) - for i := 0; i < len(q.staged); i++ { - if strings.HasPrefix(q.staged[i].JobName(), prefix) { - toEnqueue = append(toEnqueue, q.staged[i]) - q.staged = append(q.staged[:i], q.staged[i+1:]...) - i-- + for jobName, stage := range q.staged { + if strings.HasPrefix(jobName, prefix) { + toEnqueue = append(toEnqueue, stage) + delete(q.staged, jobName) } } } @@ -75,7 +74,7 @@ func (q *Queue) stage(counter counter.Interface) bool { } } - q.staged = append(q.staged, counter) + q.staged[jobName] = counter return true } diff --git a/internal/queue/staging_test.go b/internal/queue/staging_test.go index f3bff66..98f5571 100644 --- a/internal/queue/staging_test.go +++ b/internal/queue/staging_test.go @@ -94,6 +94,7 @@ func Test_DeliverablePrefixes(t *testing.T) { var triggered []string q := &Queue{ deliverablePrefixes: make(map[string]*atomic.Int32), + staged: make(map[string]counter.Interface), queue: queue.NewProcessor[string, counter.Interface]( func(counter counter.Interface) { lock.Lock() @@ -109,15 +110,15 @@ func Test_DeliverablePrefixes(t *testing.T) { counter4 := fake.New().WithJobName("def234").WithKey("def234") counter5 := fake.New().WithJobName("xyz123").WithKey("xyz123") counter6 := fake.New().WithJobName("xyz234").WithKey("xyz234") - q.staged = []counter.Interface{ - counter1, counter2, - counter3, counter4, - counter5, counter6, + q.staged = map[string]counter.Interface{ + "abc123": counter1, "abc234": counter2, + "def123": counter3, "def234": counter4, + "xyz123": counter5, "xyz234": counter6, } cancel := q.DeliverablePrefixes("abc", "xyz") t.Cleanup(cancel) - assert.ElementsMatch(t, []counter.Interface{counter3, counter4}, q.staged) + assert.Equal(t, map[string]counter.Interface{"def123": counter3, "def234": counter4}, q.staged) assert.EventuallyWithT(t, func(c *assert.CollectT) { lock.Lock() defer lock.Unlock() @@ -181,6 +182,7 @@ func Test_stage(t *testing.T) { q := &Queue{ deliverablePrefixes: make(map[string]*atomic.Int32), + staged: make(map[string]counter.Interface), } for _, prefix := range test.deliverablePrefixes { diff --git a/tests/suite/failurepolicy_test.go b/tests/suite/failurepolicy_test.go index 8c7fcca..6735e39 100644 --- a/tests/suite/failurepolicy_test.go +++ b/tests/suite/failurepolicy_test.go @@ -130,7 +130,7 @@ func Test_FailurePolicy(t *testing.T) { FailurePolicy: &api.FailurePolicy{ Policy: &api.FailurePolicy_Constant{ Constant: &api.FailurePolicyConstant{ - Delay: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)), + Interval: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)), }, }, }, @@ -177,7 +177,7 @@ func Test_FailurePolicy(t *testing.T) { FailurePolicy: &api.FailurePolicy{ Policy: &api.FailurePolicy_Constant{ Constant: &api.FailurePolicyConstant{ - Delay: durationpb.New(time.Millisecond), + Interval: durationpb.New(time.Millisecond), }, }, }, diff --git a/tests/suite/jobwithspace_test.go b/tests/suite/jobwithspace_test.go new file mode 100644 index 0000000..c0cb7f0 --- /dev/null +++ b/tests/suite/jobwithspace_test.go @@ -0,0 +1,56 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package suite + +import ( + "context" + "testing" + "time" + + "github.com/dapr/kit/ptr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/diagridio/go-etcd-cron/api" + "github.com/diagridio/go-etcd-cron/tests/framework/cron/integration" +) + +func Test_jobWithSpace(t *testing.T) { + t.Parallel() + + cron := integration.NewBase(t, 1) + + require.NoError(t, cron.API().Add(context.Background(), "hello world", &api.Job{ + DueTime: ptr.Of(time.Now().Add(2).Format(time.RFC3339)), + })) + resp, err := cron.API().Get(context.Background(), "hello world") + require.NoError(t, err) + assert.NotNil(t, resp) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, 1, cron.Triggered()) + resp, err = cron.API().Get(context.Background(), "hello world") + assert.NoError(c, err) + assert.Nil(c, resp) + }, time.Second*10, time.Millisecond*10) + + require.NoError(t, cron.API().Add(context.Background(), "another hello world", &api.Job{ + Schedule: ptr.Of("@every 1s"), + })) + resp, err = cron.API().Get(context.Background(), "another hello world") + require.NoError(t, err) + assert.NotNil(t, resp) + listresp, err := cron.API().List(context.Background(), "") + require.NoError(t, err) + assert.Len(t, listresp.GetJobs(), 1) + require.NoError(t, cron.API().Delete(context.Background(), "another hello world")) + resp, err = cron.API().Get(context.Background(), "another hello world") + require.NoError(t, err) + assert.Nil(t, resp) + listresp, err = cron.API().List(context.Background(), "") + require.NoError(t, err) + assert.Empty(t, listresp.GetJobs()) +} diff --git a/tests/suite/undeliverable_test.go b/tests/suite/undeliverable_test.go index 77162e4..70b19e1 100644 --- a/tests/suite/undeliverable_test.go +++ b/tests/suite/undeliverable_test.go @@ -73,7 +73,7 @@ func Test_undeliverable(t *testing.T) { assert.ElementsMatch(t, append(names, names...), got) }) - t.Run("single: jobs which are marked as undeliverable, should be triggered when their prefix is registered", func(t *testing.T) { + t.Run("multiple: jobs which are marked as undeliverable, should be triggered when their prefix is registered", func(t *testing.T) { t.Parallel() var got []string @@ -454,4 +454,184 @@ func Test_undeliverable(t *testing.T) { <-time.After(time.Second) assert.Equal(t, trigger, inTrigger.Load()) }) + + t.Run("Deleting a staged job should not be triggered once it has been marked as deliverable", func(t *testing.T) { + t.Parallel() + + var triggered []string + var lock sync.Mutex + var i int + cron := integration.New(t, integration.Options{ + PartitionTotal: 1, + TriggerFn: func(req *api.TriggerRequest) *api.TriggerResponse { + lock.Lock() + defer lock.Unlock() + i++ + triggered = append(triggered, req.GetName()) + if len(triggered) <= 2 { + return &api.TriggerResponse{Result: api.TriggerResponseResult_UNDELIVERABLE} + } + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + }) + + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + })) + require.NoError(t, cron.API().Add(cron.Context(), "xyz1", &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + })) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + lock.Lock() + defer lock.Unlock() + assert.Equal(c, []string{"abc1", "xyz1"}, triggered) + }, time.Second*10, time.Millisecond*10) + + require.NoError(t, cron.API().Delete(cron.Context(), "abc1")) + + cancel, err := cron.API().DeliverablePrefixes(cron.Context(), "abc") + require.NoError(t, err) + t.Cleanup(cancel) + time.Sleep(time.Second * 2) + assert.Equal(t, []string{"abc1", "xyz1"}, triggered) + }) + + t.Run("Deleting prefixes staged jobs should not be triggered once it has been marked as deliverable", func(t *testing.T) { + t.Parallel() + + var triggered []string + var lock sync.Mutex + cron := integration.New(t, integration.Options{ + PartitionTotal: 1, + TriggerFn: func(req *api.TriggerRequest) *api.TriggerResponse { + lock.Lock() + defer lock.Unlock() + triggered = append(triggered, req.GetName()) + if len(triggered) < 4 { + return &api.TriggerResponse{Result: api.TriggerResponseResult_UNDELIVERABLE} + } + return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS} + }, + }) + + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + })) + require.NoError(t, cron.API().Add(cron.Context(), "def1", &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + })) + require.NoError(t, cron.API().Add(cron.Context(), "xyz1", &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + })) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + lock.Lock() + defer lock.Unlock() + assert.Equal(c, []string{"abc1", "def1", "xyz1"}, triggered) + }, time.Second*10, time.Millisecond*10) + + require.NoError(t, cron.API().DeletePrefixes(cron.Context(), "abc", "def")) + + cancel, err := cron.API().DeliverablePrefixes(cron.Context(), "abc", "def") + require.NoError(t, err) + t.Cleanup(cancel) + time.Sleep(time.Second * 2) + assert.Equal(t, []string{"abc1", "def1", "xyz1"}, triggered) + }) + + t.Run("Re-scheduling the job should not trigger the old staged job when prefix is added", func(t *testing.T) { + t.Parallel() + + var ret atomic.Value + var triggered atomic.Uint32 + ret.Store(api.TriggerResponseResult_UNDELIVERABLE) + + cron := integration.New(t, integration.Options{ + PartitionTotal: 1, + TriggerFn: func(*api.TriggerRequest) *api.TriggerResponse { + triggered.Add(1) + return &api.TriggerResponse{Result: ret.Load().(api.TriggerResponseResult)} + }, + }) + + dueTime := ptr.Of(time.Now().Format(time.RFC3339)) + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{DueTime: dueTime})) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(1), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + + ret.Store(api.TriggerResponseResult_SUCCESS) + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{DueTime: dueTime})) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(2), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + + cancel, err := cron.API().DeliverablePrefixes(cron.Context(), "abc") + require.NoError(t, err) + t.Cleanup(cancel) + time.Sleep(time.Second * 2) + assert.Equal(t, uint32(2), triggered.Load()) + }) + + t.Run("Re-scheduling the job after multiple puts should not trigger the old staged job when prefix is added", func(t *testing.T) { + t.Parallel() + + var ret atomic.Value + var triggered atomic.Uint32 + ret.Store(api.TriggerResponseResult_UNDELIVERABLE) + + cron := integration.New(t, integration.Options{ + PartitionTotal: 1, + TriggerFn: func(*api.TriggerRequest) *api.TriggerResponse { + triggered.Add(1) + return &api.TriggerResponse{Result: ret.Load().(api.TriggerResponseResult)} + }, + }) + + dueTime := ptr.Of(time.Now().Format(time.RFC3339)) + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{DueTime: dueTime})) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(1), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + + ret.Store(api.TriggerResponseResult_SUCCESS) + cancel, err := cron.API().DeliverablePrefixes(cron.Context(), "abc") + require.NoError(t, err) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(2), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + resp, err := cron.API().Get(cron.Context(), "abc1") + require.NoError(t, err) + assert.Nil(t, resp) + cancel() + + ret.Store(api.TriggerResponseResult_UNDELIVERABLE) + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{DueTime: dueTime})) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(3), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + + ret.Store(api.TriggerResponseResult_SUCCESS) + require.NoError(t, cron.API().Add(cron.Context(), "abc1", &api.Job{DueTime: dueTime})) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, uint32(4), triggered.Load()) + }, time.Second*10, time.Millisecond*10) + resp, err = cron.API().Get(cron.Context(), "abc1") + require.NoError(t, err) + assert.Nil(t, resp) + + cancel, err = cron.API().DeliverablePrefixes(cron.Context(), "abc") + require.NoError(t, err) + t.Cleanup(cancel) + time.Sleep(time.Second * 2) + assert.Equal(t, uint32(4), triggered.Load()) + }) }