diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index 6d32210ae0..dbd6a1b467 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -15,7 +15,6 @@ var ( Deployment *DeploymentMetrics Ingress *IngressMetrics PubSub *PubSubMetrics - Cron *CronMetrics Controller *ControllerTracing Timeline *TimelineMetrics ) @@ -34,8 +33,6 @@ func init() { errs = errors.Join(errs, err) PubSub, err = initPubSubMetrics() errs = errors.Join(errs, err) - Cron, err = initCronMetrics() - errs = errors.Join(errs, err) Controller = initControllerTracing() Timeline, err = initTimelineMetrics() errs = errors.Join(errs, err) diff --git a/backend/controller/observability/cron.go b/backend/cron/observability/cron.go similarity index 89% rename from backend/controller/observability/cron.go rename to backend/cron/observability/cron.go index ffd5fe633d..6e796ba62e 100644 --- a/backend/controller/observability/cron.go +++ b/backend/cron/observability/cron.go @@ -3,6 +3,7 @@ package observability import ( "context" "fmt" + "time" "github.com/alecthomas/types/optional" "go.opentelemetry.io/otel" @@ -20,6 +21,8 @@ const ( cronJobKilledStatus = "killed" cronJobFailedStartStatus = "failed_start" + + deploymentMeterName = "ftl.deployments.cron" ) type CronMetrics struct { @@ -28,6 +31,16 @@ type CronMetrics struct { jobLatency metric.Int64Histogram } +var Cron *CronMetrics + +func init() { + var err error + Cron, err = initCronMetrics() + if err != nil { + panic(fmt.Errorf("could not initialize cron metrics: %w", err)) + } +} + func initCronMetrics() (*CronMetrics, error) { result := &CronMetrics{ jobsActive: noop.Int64UpDownCounter{}, @@ -108,3 +121,11 @@ func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metr } return metric.WithAttributes(attributes...) } + +func wrapErr(signalName string, err error) error { + return fmt.Errorf("failed to create %q signal: %w", signalName, err) +} + +func timeSinceMS(start time.Time) int64 { + return time.Since(start).Milliseconds() +} diff --git a/backend/cron/service.go b/backend/cron/service.go new file mode 100644 index 0000000000..65891baf3b --- /dev/null +++ b/backend/cron/service.go @@ -0,0 +1,204 @@ +package cron + +import ( + "context" + "fmt" + "sort" + "time" + + "connectrpc.com/connect" + "github.com/jpillora/backoff" + "golang.org/x/sync/errgroup" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/slices" +) + +type PullSchemaClient interface { + PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error) +} + +type CallClient interface { + Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) +} + +type cronJob struct { + module string + verb *schema.Verb + cronmd *schema.MetadataCronJob + pattern cron.Pattern + next time.Time +} + +func (c cronJob) String() string { + desc := fmt.Sprintf("%s.%s (%s)", c.module, c.verb.Name, c.pattern) + var next string + if time.Until(c.next) > 0 { + next = fmt.Sprintf(" (next run in %s)", time.Until(c.next)) + } + return desc + next +} + +// Start the cron service. Blocks until the context is cancelled. +func Start(ctx context.Context, pullSchemaClient PullSchemaClient, verbClient CallClient) error { + wg, ctx := errgroup.WithContext(ctx) + changes := make(chan *ftlv1.PullSchemaResponse, 8) + // Start processing cron jobs and schema changes. + wg.Go(func() error { + return run(ctx, verbClient, changes) + }) + // Start watching for schema changes. + wg.Go(func() error { + rpc.RetryStreamingServerStream(ctx, backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error { + changes <- resp + return nil + }, rpc.AlwaysRetry()) + return nil + }) + err := wg.Wait() + if err != nil { + return fmt.Errorf("cron service stopped: %w", err) + } + return nil +} + +func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSchemaResponse) error { + logger := log.FromContext(ctx).Scope("cron") + // Map of cron jobs for each module. + cronJobs := map[string][]cronJob{} + // Cron jobs ordered by next execution. + cronQueue := []cronJob{} + + logger.Debugf("Starting cron service") + + for { + next, ok := scheduleNext(cronQueue) + var nextCh <-chan time.Time + if ok { + logger.Tracef("Next cron job scheduled in %s", next) + nextCh = time.After(next) + } + select { + case <-ctx.Done(): + return fmt.Errorf("cron service stopped: %w", ctx.Err()) + + case resp := <-changes: + if err := updateCronJobs(cronJobs, resp); err != nil { + logger.Errorf(err, "Failed to update cron jobs") + continue + } + cronQueue = rebuildQueue(cronJobs) + + // Execute scheduled cron job + case <-nextCh: + job := cronQueue[0] + logger.Debugf("Executing cron job %s", job) + + nextRun, err := cron.Next(job.pattern, false) + if err != nil { + logger.Errorf(err, "Failed to calculate next run time") + continue + } + job.next = nextRun + cronQueue[0] = job + orderQueue(cronQueue) + + if err := callCronJob(ctx, verbClient, job); err != nil { + logger.Errorf(err, "Failed to execute cron job") + } + } + } +} + +func callCronJob(ctx context.Context, verbClient CallClient, cronJob cronJob) error { + logger := log.FromContext(ctx).Scope("cron") + ref := schema.Ref{Module: cronJob.module, Name: cronJob.verb.Name} + logger.Debugf("Calling cron job %s", cronJob) + resp, err := verbClient.Call(ctx, connect.NewRequest(&ftlv1.CallRequest{ + Verb: ref.ToProto().(*schemapb.Ref), + Body: []byte(`{}`), + Metadata: &ftlv1.Metadata{}, + })) + if err != nil { + return fmt.Errorf("%s: call to cron job failed: %w", ref, err) + } + switch resp := resp.Msg.Response.(type) { + default: + return nil + + case *ftlv1.CallResponse_Error_: + return fmt.Errorf("%s: cron job failed: %s", ref, resp.Error.Message) + } +} + +func scheduleNext(cronQueue []cronJob) (time.Duration, bool) { + if len(cronQueue) == 0 { + return 0, false + } + return time.Until(cronQueue[0].next), true +} + +func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error { + switch resp.ChangeType { + case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: + delete(cronJobs, resp.ModuleName) + + case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: + moduleSchema, err := schema.ModuleFromProto(resp.Schema) + if err != nil { + return fmt.Errorf("failed to extract module schema: %w", err) + } + moduleJobs, err := extractCronJobs(moduleSchema) + if err != nil { + return fmt.Errorf("failed to extract cron jobs: %w", err) + } + cronJobs[resp.ModuleName] = moduleJobs + } + return nil +} + +func orderQueue(queue []cronJob) { + sort.SliceStable(queue, func(i, j int) bool { + return queue[i].next.Before(queue[j].next) + }) +} + +func rebuildQueue(cronJobs map[string][]cronJob) []cronJob { + queue := make([]cronJob, 0, len(cronJobs)*2) // Assume 2 cron jobs per module. + for _, jobs := range cronJobs { + queue = append(queue, jobs...) + } + orderQueue(queue) + return queue +} + +func extractCronJobs(module *schema.Module) ([]cronJob, error) { + cronJobs := []cronJob{} + for verb := range slices.FilterVariants[*schema.Verb](module.Decls) { + cronmd, ok := slices.FindVariant[*schema.MetadataCronJob](verb.Metadata) + if !ok { + continue + } + pattern, err := cron.Parse(cronmd.Cron) + if err != nil { + return nil, fmt.Errorf("%s: %w", cronmd.Pos, err) + } + next, err := cron.Next(pattern, false) + if err != nil { + return nil, fmt.Errorf("%s: %w", cronmd.Pos, err) + } + cronJobs = append(cronJobs, cronJob{ + module: module.Name, + verb: verb, + cronmd: cronmd, + pattern: pattern, + next: next, + }) + } + return cronJobs, nil +} diff --git a/backend/cron/service_test.go b/backend/cron/service_test.go new file mode 100644 index 0000000000..261d2b8867 --- /dev/null +++ b/backend/cron/service_test.go @@ -0,0 +1,109 @@ +package cron + +import ( + "context" + "os" + "sort" + "testing" + "time" + + "connectrpc.com/connect" + "golang.org/x/sync/errgroup" + + "github.com/alecthomas/assert/v2" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/schema" +) + +type verbClient struct { + requests chan *ftlv1.CallRequest +} + +var _ CallClient = (*verbClient)(nil) + +func (v *verbClient) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { + v.requests <- req.Msg + return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: []byte("{}")}}), nil +} + +func TestCron(t *testing.T) { + changes := make(chan *ftlv1.PullSchemaResponse, 8) + module := &schema.Module{ + Name: "echo", + Decls: []schema.Decl{ + &schema.Verb{ + Name: "echo", + Request: &schema.Unit{}, + Response: &schema.Unit{}, + Metadata: []schema.Metadata{ + &schema.MetadataCronJob{Cron: "*/2 * * * * *"}, + }, + }, + &schema.Verb{ + Name: "time", + Request: &schema.Unit{}, + Response: &schema.Unit{}, + Metadata: []schema.Metadata{ + &schema.MetadataCronJob{Cron: "*/2 * * * * *"}, + }, + }, + }, + } + changes <- &ftlv1.PullSchemaResponse{ + ModuleName: "echo", + Schema: module.ToProto().(*schemapb.Module), //nolint:forcetypeassert + } + + ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace})) + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + t.Cleanup(cancel) + + wg, ctx := errgroup.WithContext(ctx) + + requestsch := make(chan *ftlv1.CallRequest, 8) + client := &verbClient{ + requests: requestsch, + } + + wg.Go(func() error { return run(ctx, client, changes) }) + + requests := make([]*ftlv1.CallRequest, 0, 2) + +done: + for range 2 { + select { + case <-ctx.Done(): + t.Fatalf("timed out: %s", ctx.Err()) + + case request := <-requestsch: + requests = append(requests, request) + if len(requests) == 2 { + break done + } + } + } + + cancel() + + sort.SliceStable(requests, func(i, j int) bool { + return requests[i].Verb.Name < requests[j].Verb.Name + }) + assert.Equal(t, []*ftlv1.CallRequest{ + { + Metadata: &ftlv1.Metadata{}, + Verb: &schemapb.Ref{Module: "echo", Name: "echo"}, + Body: []byte("{}"), + }, + { + Metadata: &ftlv1.Metadata{}, + Verb: &schemapb.Ref{Module: "echo", Name: "time"}, + Body: []byte("{}"), + }, + }, requests, assert.Exclude[*schemapb.Position]()) + + err := wg.Wait() + assert.IsError(t, err, context.Canceled) +} diff --git a/internal/cron/pattern.go b/internal/cron/pattern.go index 257c243063..a9a41137a3 100644 --- a/internal/cron/pattern.go +++ b/internal/cron/pattern.go @@ -38,6 +38,7 @@ var ( parser = participle.MustBuild[Pattern](parserOptions...) ) +// Pattern represents a cron schedule. type Pattern struct { Duration *string `parser:"@(Number ('s' | 'm' | 'h'))"` DayOfWeek *DayOfWeek `parser:"| @('Mon' | 'Tue' | 'Wed' | 'Thu' | 'Fri' | 'Sat' | 'Sun')"` diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index f1af32ffa5..3fd0107fd8 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -264,8 +264,8 @@ func AlwaysRetry() func(error) bool { } // RetryStreamingServerStream will repeatedly call handler with responses from -// the stream returned by "rpc" until handler returns an error or the context is -// cancelled. +// the stream returned by "rpc" until either the context is cancelled or the +// errorRetryCallback returns false. func RetryStreamingServerStream[Req, Resp any]( ctx context.Context, retry backoff.Backoff, diff --git a/internal/slices/slices.go b/internal/slices/slices.go index f25acc5a26..3dcb017128 100644 --- a/internal/slices/slices.go +++ b/internal/slices/slices.go @@ -2,6 +2,7 @@ package slices import ( "cmp" + "iter" "sort" ) @@ -105,6 +106,20 @@ func FindVariant[T any, U any](slice []U) (T, bool) { return zero, false } +// FilterVariants finds all elements in a slice that can be cast to the given type. +func FilterVariants[T any, U any](slice []U) iter.Seq[T] { + return func(yield func(T) bool) { + for _, el := range slice { + if found, ok := any(el).(T); ok { + if !yield(found) { + return + } + } + } + } +} + +// Unique returns a new slice containing only the unique elements of the input, with the order preserved. func Unique[T comparable](slice []T) []T { seen := make(map[T]struct{}) result := make([]T, 0, len(slice))