Skip to content

Commit

Permalink
Trigger Response Undeliverable
Browse files Browse the repository at this point in the history
Adds a public API `TriggerResponse` proto message which is now returned
from the Trigger function call. This message contains a Result enum
which encompasses signalling the existing SUCCESS and FAILURE job
trigger results, as well as a new UNDELIVERABLE result.

It can be the case that a job hasn't necessarily failed, but it actually
just can't be delivered to the target at this time. To prevent the Job
going into a failure loop, receiving the `UNDELIVERABLE` result will put
the job into a new "staging" queue. This puts the job into a holding
pattern until it can be delivered.

The consumer can use the new `DeliverablePrefixes` call to register the
set of prefixes whose matching job names can be delivered. The returned
`CancelFunc` is used to de-register this prefix. Prefixes are registered
as a pool in that multiple identical prefixes can be registered and
which will remain active until all are de-registered via cancellation.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed Oct 15, 2024
1 parent e02a190 commit 1d693ad
Show file tree
Hide file tree
Showing 42 changed files with 2,335 additions and 406 deletions.
28 changes: 22 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ func main() {
Namespace: "abc",
PartitionID: 0,
PartitionTotal: 1,
TriggerFn: func(context.Context, *api.TriggerRequest) bool {
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
// Do something with your trigger here.
// Return true if the trigger was successful, false otherwise.
// Note, returning false will cause the job to be retried according to
// the Jobs configurable FailurePolicy.
return true
// Return SUCCESS if the trigger was successful, FAILED if the trigger
// failed and should be subject to the FailurePolicy, or UNDELIVERABLE if
// the job is currently undeliverable and should be moved to the staging
// queue. Use `cron.DeliverablePrefixes` elsewhere to mark jobs with the
// given prefixes as now deliverable.
return &api.TriggerResponse{
Result: api.TriggerResponseResult_SUCCESS,
// Result: api.TriggerResponseResult_FAILED,
// Result: api.TriggerResponseResult_UNDELIVERABLE,
}
},
})
if err != nil {
Expand All @@ -43,11 +49,14 @@ func main() {
meta, _ := anypb.New(wrapperspb.String("world"))
tt := time.Now().Add(time.Second).Format(time.RFC3339)

cron.Add(context.TODO(), "my-job", &api.Job{
err = cron.Add(context.TODO(), "my-job", &api.Job{
DueTime: &tt,
Payload: payload,
Metadata: meta,
})
if err != nil {
panic(err)
}
}
```

Expand Down Expand Up @@ -79,6 +88,13 @@ A Job itself is made up of the following fields:

A job must have *at least* either a `Schedule` or a `DueTime` set.

### Undeliverable Jobs

It can be the case that a job trigger hasn't actually _failed_, but instead is simply undeliverable at the current time.
In such cases, the trigger function can return `UNDELIVERABLE` to indicate that the job should be moved to the "staging queue" to be held until it can be delivered.
Staged jobs can be marked as deliverable again by calling `cron.DeliverablePrefixes` with the prefixes of those job names.
Jobs whose name match these prefixes will be re-enqueued for delivery.

## Leadership

The cron scheduler uses a partition key ownership model to ensure that only one partition instance of the scheduler is running at any given time.
Expand Down
19 changes: 15 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

// TriggerFunction is the type of the function that is called when a job is
// triggered.
// Returning true will "tick" the job forward to the next scheduled time.
// Returning false will cause the job to be re-enqueued and triggered
// immediately.
type TriggerFunction func(context.Context, *TriggerRequest) bool
// The returne TriggerResponse will indicate whether the Job was successfully
// triggered, the trigger failed, or the Job need to be put into the staging
// queue.
type TriggerFunction func(context.Context, *TriggerRequest) *TriggerResponse

// API is the interface for interacting with the cron instance.
type API interface {
Expand All @@ -33,6 +33,17 @@ type API interface {

// List lists all jobs under a given job name prefix.
List(ctx context.Context, prefix string) (*ListResponse, error)

// DeliverablePrefixes registers the given Job name prefixes as being
// deliverable. Any Jobs that reside in the staging queue because they were
// undeliverable at the time of trigger but whose names match these prefixes
// will be immediately re-triggered.
// The returned CancelFunc should be called to unregister the prefixes,
// meaning these prefixes are no longer delivable by the caller. Duplicate
// Prefixes may be called together and will be pooled together, meaning that
// the prefix is still active if there is at least one DeliverablePrefixes
// call that has not been unregistered.
DeliverablePrefixes(ctx context.Context, prefixes ...string) (context.CancelFunc, error)
}

// Interface is a cron interface. It schedules and manages job which are stored
Expand Down
166 changes: 150 additions & 16 deletions api/trigger.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1d693ad

Please sign in to comment.