Skip to content

Commit

Permalink
add simple cron implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 8, 2024
1 parent 0f50c28 commit b9e085f
Show file tree
Hide file tree
Showing 13 changed files with 911 additions and 31 deletions.
14 changes: 9 additions & 5 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"connectrpc.com/connect"
Expand All @@ -13,6 +12,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
Expand Down Expand Up @@ -99,17 +99,21 @@ func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.Dep
newJobs := []dal.CronJob{}
for _, decl := range module.Decls {
if verb, ok := decl.(*schema.Verb); ok {
if cron, ok := verb.GetMetadataCronJob().Get(); ok {
if cronJob, ok := verb.GetMetadataCronJob().Get(); ok {
//TODO: remove prefix trimming when swapping out cron lib
schedule := strings.TrimPrefix(cron.String(), "+cron ")
schedule, err := cron.Parse(cronJob.CronString())
if err != nil {
logger.Errorf(err, "failed to parse cron schedule %q", cronJob.CronString())
continue
}

start := time.Now().UTC()
next, err := gronx.NextTickAfter(schedule, start, false)
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", deploymentKey, verb.Name, schedule)
continue
}
created, err := s.dal.CreateCronJob(ctx, deploymentKey, module.Name, verb.Name, schedule, start, next)
created, err := s.dal.CreateCronJob(ctx, deploymentKey, module.Name, verb.Name, schedule.String(), start, next)
if err != nil {
logger.Errorf(err, "failed to create cron job %v:%v", deploymentKey, verb.Name)
} else {
Expand Down
11 changes: 0 additions & 11 deletions backend/controller/scheduledtask/scheduledtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package scheduledtask

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/adhocore/gronx"

"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -61,11 +58,3 @@ func TestCron(t *testing.T) {
assert.True(t, singletonCount.Load() >= 5 && singletonCount.Load() < 10, "expected singletonCount to be >= 5 but was %d", singletonCount.Load())
assert.True(t, multiCount.Load() >= 20 && multiCount.Load() < 30, "expected multiCount to be >= 20 but was %d", multiCount.Load())
}

func TestCronLib(t *testing.T) {
expr := "0 0 1 1 *"
allowCurrent := true // includes current time as well
prevTime, err := gronx.PrevTickBefore(expr, time.Now().UTC(), allowCurrent) // gives time.Time, error
assert.NoError(t, err)
fmt.Printf("prev time: %s", prevTime.String())
}
8 changes: 6 additions & 2 deletions backend/schema/metadatacronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ var _ Metadata = (*MetadataCronJob)(nil)

func (m *MetadataCronJob) Position() Position { return m.Pos }
func (m *MetadataCronJob) String() string {
pattern := strings.Join(slices.Map(m.Components, func(component *CronJobComponent) string {

return fmt.Sprintf("+cron %s", m.CronString())
}

func (m *MetadataCronJob) CronString() string {
return strings.Join(slices.Map(m.Components, func(component *CronJobComponent) string {
return component.String()
}), " ")
return fmt.Sprintf("+cron %s", pattern)
}

func (m *MetadataCronJob) schemaChildren() []Node {
Expand Down
10 changes: 6 additions & 4 deletions backend/schema/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"sort"
"strings"

"github.com/adhocore/gronx"
"github.com/alecthomas/participle/v2"
"github.com/alecthomas/participle/v2/lexer"
"github.com/alecthomas/types/optional"
"golang.org/x/exp/maps"

"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/errors"
dc "github.com/TBD54566975/ftl/internal/reflect"
)
Expand Down Expand Up @@ -159,9 +159,11 @@ func Validate(schema *Schema) (*Schema, error) {
hasCron = true

//TODO: remove prefix trimming when swapping out cron lib
schedule := strings.TrimPrefix(md.String(), "+cron ")
if valid := gronx.New().IsValid(schedule); !valid {
merr = append(merr, errorf(md, "invalid cron schedule %q", schedule))
schedule, err := cron.Parse(md.CronString())
if err != nil {
merr = append(merr, err)
} else if err := cron.Validate(schedule); err != nil {
merr = append(merr, err)
}

case *MetadataCalls, *MetadataDatabases, *MetadataAlias:
Expand Down
1 change: 0 additions & 1 deletion buildengine/testdata/projects/another/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/TBD54566975/scaffolder v0.8.0 // indirect
github.com/adhocore/gronx v1.8.1 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v0.9.0 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions buildengine/testdata/projects/another/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go-runtime/compile/testdata/one/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/TBD54566975/scaffolder v0.8.0 // indirect
github.com/adhocore/gronx v1.8.1 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v0.9.0 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go-runtime/compile/testdata/one/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go-runtime/compile/testdata/two/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/TBD54566975/scaffolder v0.8.0 // indirect
github.com/adhocore/gronx v1.8.1 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v0.9.0 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go-runtime/compile/testdata/two/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b9e085f

Please sign in to comment.