diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c71423..d50d0a88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,10 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ```go # before - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) # after - migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) if err != nil { // handle error } @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Fixed - Fixed a panic that'd occur if `StopAndCancel` was invoked before a client was started. [PR #557](https://github.com/riverqueue/river/pull/557). +- A `PeriodicJobConstructor` should be able to return `nil` `JobArgs` if it wishes to not have any job inserted. However, this was either never working or was broken at some point. It's now fixed. Thanks [@semanser](https://github.com/semanser)! [PR #572](https://github.com/riverqueue/river/pull/572). ## [0.11.4] - 2024-08-20 diff --git a/client_test.go b/client_test.go index f118e435..3bf1607c 100644 --- a/client_test.go +++ b/client_test.go @@ -2727,6 +2727,33 @@ func Test_Client_Maintenance(t *testing.T) { require.Empty(t, jobs) }) + t.Run("PeriodicJobConstructorReturningNil", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + + worker := &periodicJobWorker{} + AddWorker(config.Workers, worker) + config.PeriodicJobs = []*PeriodicJob{ + NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new + // job and should be handled cleanly + return nil, nil + }, &PeriodicJobOpts{RunOnStart: true}), + } + + client, bundle := setup(t, config) + + startAndWaitForQueueMaintainer(ctx, t, client) + + svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer) + svc.TestSignals.SkippedJob.WaitOrTimeout() + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()}) + require.NoError(t, err) + require.Empty(t, jobs, "Expected to find zero jobs of kind: "+(periodicJobArgs{}).Kind()) + }) + t.Run("PeriodicJobEnqueuerAddDynamically", func(t *testing.T) { t.Parallel() diff --git a/periodic_job.go b/periodic_job.go index 90d55cbb..20648e09 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -183,6 +183,9 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { args, options := periodicJob.constructorFunc() + if args == nil { + return nil, nil, maintenance.ErrNoJobToInsert + } return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, diff --git a/periodic_job_test.go b/periodic_job_test.go index 299b70b3..745a06d8 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -16,7 +16,7 @@ func TestPeriodicJobBundle(t *testing.T) { type testBundle struct{} - setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { + setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { //nolint:unparam t.Helper() periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer( @@ -59,6 +59,26 @@ func TestPeriodicJobBundle(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, mustUnmarshalJSON[TestJobArgs](t, insertParams2.EncodedArgs).JobNum) }) + + t.Run("ReturningNilDoesntInsertNewJob", func(t *testing.T) { + t.Parallel() + + periodicJobBundle, _ := setup(t) + + periodicJob := NewPeriodicJob( + PeriodicInterval(15*time.Minute), + func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new job. + return nil, nil + }, + nil, + ) + + internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) + + _, _, err := internalPeriodicJob.ConstructorFunc() + require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) + }) } func mustUnmarshalJSON[T any](t *testing.T, data []byte) *T {