From 8669cbc38baf69eeda9dc4eb2944e8ef514d12c7 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Fri, 30 Aug 2024 07:09:28 -0500 Subject: [PATCH] Fix nil periodic jobs (#572) * Do not insert a new job if PeriodicJobConstructor returns nil This commit fixes an issue with the PerioridJobConstructor ignoring the return value. According to the docs, we should ignore the job is nil is returned. * Remove unused struct * add client-level test coverage for PeriodicJobConstructor nil return --------- Co-authored-by: Andriy Semenets --- CHANGELOG.md | 5 +++-- client_test.go | 27 +++++++++++++++++++++++++++ periodic_job.go | 3 +++ periodic_job_test.go | 22 +++++++++++++++++++++- 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c71423..d50d0a88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,10 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ```go # before - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) # after - migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) if err != nil { // handle error } @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Fixed - Fixed a panic that'd occur if `StopAndCancel` was invoked before a client was started. [PR #557](https://github.com/riverqueue/river/pull/557). +- A `PeriodicJobConstructor` should be able to return `nil` `JobArgs` if it wishes to not have any job inserted. However, this was either never working or was broken at some point. It's now fixed. Thanks [@semanser](https://github.com/semanser)! [PR #572](https://github.com/riverqueue/river/pull/572). ## [0.11.4] - 2024-08-20 diff --git a/client_test.go b/client_test.go index f118e435..3bf1607c 100644 --- a/client_test.go +++ b/client_test.go @@ -2727,6 +2727,33 @@ func Test_Client_Maintenance(t *testing.T) { require.Empty(t, jobs) }) + t.Run("PeriodicJobConstructorReturningNil", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + + worker := &periodicJobWorker{} + AddWorker(config.Workers, worker) + config.PeriodicJobs = []*PeriodicJob{ + NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new + // job and should be handled cleanly + return nil, nil + }, &PeriodicJobOpts{RunOnStart: true}), + } + + client, bundle := setup(t, config) + + startAndWaitForQueueMaintainer(ctx, t, client) + + svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer) + svc.TestSignals.SkippedJob.WaitOrTimeout() + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()}) + require.NoError(t, err) + require.Empty(t, jobs, "Expected to find zero jobs of kind: "+(periodicJobArgs{}).Kind()) + }) + t.Run("PeriodicJobEnqueuerAddDynamically", func(t *testing.T) { t.Parallel() diff --git a/periodic_job.go b/periodic_job.go index 90d55cbb..20648e09 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -183,6 +183,9 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { args, options := periodicJob.constructorFunc() + if args == nil { + return nil, nil, maintenance.ErrNoJobToInsert + } return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, diff --git a/periodic_job_test.go b/periodic_job_test.go index 299b70b3..745a06d8 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -16,7 +16,7 @@ func TestPeriodicJobBundle(t *testing.T) { type testBundle struct{} - setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { + setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { //nolint:unparam t.Helper() periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer( @@ -59,6 +59,26 @@ func TestPeriodicJobBundle(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, mustUnmarshalJSON[TestJobArgs](t, insertParams2.EncodedArgs).JobNum) }) + + t.Run("ReturningNilDoesntInsertNewJob", func(t *testing.T) { + t.Parallel() + + periodicJobBundle, _ := setup(t) + + periodicJob := NewPeriodicJob( + PeriodicInterval(15*time.Minute), + func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new job. + return nil, nil + }, + nil, + ) + + internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) + + _, _, err := internalPeriodicJob.ConstructorFunc() + require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) + }) } func mustUnmarshalJSON[T any](t *testing.T, data []byte) *T {