Skip to content

Commit

Permalink
fix: do not create cronjob rows when redeploying an existing one
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 11, 2024
1 parent 2ea9252 commit ac3efcf
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 141 deletions.
15 changes: 10 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,6 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}

_ = s.cronJobs.UpdatedDeploymentMinReplicas(ctx, deploymentKey, int(req.Msg.MinReplicas))

return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
}

Expand All @@ -442,6 +440,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
return nil, fmt.Errorf("could not replace deployment: %w", err)
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}

Expand Down Expand Up @@ -737,14 +738,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
cronJobs, startTime, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema)
if err != nil {
logger.Errorf(err, "Could not create cron jobs")
return nil, fmt.Errorf("%s: %w", "could not create deployment", err)
}

dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs, startTime)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}

s.cronJobs.CreatedDeployment(ctx, dname, module)

deploymentLogger := s.getDeploymentLogger(ctx, dname)
deploymentLogger.Debugf("Created deployment %s", dname)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dname.String()}), nil
Expand Down
115 changes: 57 additions & 58 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ import (
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
"github.com/serialx/hashring"
sl "golang.org/x/exp/slices"
)

const controllersPerJob = 2
const (
controllersPerJob = 2
jobResetInterval = time.Minute
newJobHashRingOverrideInterval = time.Minute + time.Second*20
)

type Config struct {
Timeout time.Duration
Expand All @@ -34,16 +38,14 @@ type jobChangeType int

const (
resetJobs jobChangeType = iota
createJobs
finishedJobs
removedDeploymentKey
updatedHashring
)

type jobChange struct {
changeType jobChangeType
jobs []dal.CronJob
deploymentKey model.DeploymentKey
changeType jobChangeType
jobs []dal.CronJob
addedDeploymentKey optional.Option[model.DeploymentKey]
}

type hashRingState struct {
Expand All @@ -54,7 +56,6 @@ type hashRingState struct {

type DAL interface {
GetCronJobs(ctx context.Context) ([]dal.CronJob, error)
CreateCronJob(ctx context.Context, deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (dal.CronJob, error)
StartCronJobs(ctx context.Context, jobs []dal.CronJob) (jobMap map[dal.CronJob]bool, keysWithNoReplicas map[model.DeploymentKey]bool, err error)

Check failure on line 59 in backend/controller/cronjobs/cronjobs.go

View workflow job for this annotation

GitHub Actions / Lint

invalid map key type model.KeyType[model.DeploymentPayload, *model.DeploymentPayload] (typecheck)
EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error)
GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error)
Expand Down Expand Up @@ -98,74 +99,78 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, config Config,
}
svc.UpdatedControllerList(ctx, nil)

svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.resetJobs)
svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.resetJobs)
svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs)

go svc.watchForUpdates(ctx)

return svc
}

func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.DeploymentKey, module *schema.Module) {
func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) (jobs []dal.CronJob, startTime time.Time, err error) {
logger := log.FromContext(ctx)

start := s.clock.Now().UTC()
newJobs := []dal.CronJob{}
for _, decl := range module.Decls {
if verb, ok := decl.(*schema.Verb); ok {
if cronJob, ok := verb.GetMetadataCronJob().Get(); ok {
schedule, err := cron.Parse(cronJob.Cron)
if err != nil {
logger.Errorf(err, "failed to parse cron schedule %q", cronJob.Cron)
continue
}

start := s.clock.Now().UTC()
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", deploymentKey, verb.Name, schedule)
continue
}
created, err := s.dal.CreateCronJob(ctx, deploymentKey, module.Name, verb.Name, schedule.String(), start, next)
if err != nil {
logger.Errorf(err, "failed to create cron job %v:%v", deploymentKey, verb.Name)
} else {
newJobs = append(newJobs, created)
if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok {
for _, metadata := range verb.Verb.Metadata {
if cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob); ok {
cronStr := cronMetadata.CronJob.Cron
schedule, err := cron.Parse(cronStr)
if err != nil {
logger.Errorf(err, "failed to parse cron schedule %q", cronStr)
continue
}
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", module.Name, verb.Verb.Name, schedule)
continue
}
newJobs = append(newJobs, dal.CronJob{
// DeploymentKey: Filled in by DAL,
Module: module.Name,
Verb: verb.Verb.Name,
Schedule: cronStr,
StartTime: start,
NextExecution: next,
State: dal.JobStateIdle,
})
}
}
}
}
return newJobs, start, nil
}

if len(newJobs) > 0 {
s.jobChanges.Publish(jobChange{
changeType: createJobs,
jobs: newJobs,
})
}
func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) {
// Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs
_ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey))
}

func (s *Service) UpdatedDeploymentMinReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error {
// Logic for which jobs to run is based on minReplicas > 0
// If minReplicas becomes 0, we should stop jobs for that deployment key
// If minReplicas was 0 and becoming something else, we can wait for a job list reset to handle that
if minReplicas > 0 {
s.jobChanges.Publish(jobChange{
changeType: removedDeploymentKey,
deploymentKey: key,
})
func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) {
err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]())
if err != nil {
return 0, err
}
return nil
return jobResetInterval, nil
}

func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) {
// resetJobsWithNewDeploymentKey resets the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time.
func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error {
logger := log.FromContext(ctx)

jobs, err := s.dal.GetCronJobs(ctx)
if err != nil {
return 0, fmt.Errorf("%s: %w", "failed to get cron jobs", err)
logger.Errorf(err, "failed to get cron jobs")
return fmt.Errorf("%s: %w", "failed to get cron jobs", err)
}
s.jobChanges.Publish(jobChange{
changeType: resetJobs,
jobs: jobs,
changeType: resetJobs,
jobs: jobs,
addedDeploymentKey: deploymentKey,
})
return time.Minute, nil
return nil
}

func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
Expand Down Expand Up @@ -261,7 +266,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {

state := &State{
executing: map[jobIdentifier]bool{},
newJobs: map[jobIdentifier]bool{},
newJobs: map[jobIdentifier]time.Time{},
blockedUntil: s.clock.Now(),
}

Expand Down Expand Up @@ -339,16 +344,10 @@ func (s *Service) watchForUpdates(ctx context.Context) {
switch event.changeType {
case resetJobs:
logger.Tracef("resetting job list: %d jobs", len(event.jobs))
state.reset(event.jobs)
case createJobs:
logger.Tracef("adding %d jobs", len(event.jobs))
state.addJobs(event.jobs)
state.reset(event.jobs, event.addedDeploymentKey)
case finishedJobs:
logger.Tracef("updating %d jobs", len(event.jobs))
state.updateJobs(event.jobs)
case removedDeploymentKey:
logger.Tracef("removing jobs for %q", event.deploymentKey)
state.removeDeploymentKey(event.deploymentKey)
case updatedHashring:
// do another cycle through the loop to see if new jobs need to be scheduled
}
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (d *mockDAL) GetCronJobs(ctx context.Context) ([]dal.CronJob, error) {
return d.jobs, nil
}

func (d *mockDAL) CreateCronJob(ctx context.Context, deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (dal.CronJob, error) {
func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (dal.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

Expand Down Expand Up @@ -183,7 +183,7 @@ func TestService(t *testing.T) {
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
_, err = mockDal.CreateCronJob(ctx, deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next)
_, err = mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next)
assert.NoError(t, err)
}

Expand Down
27 changes: 13 additions & 14 deletions backend/controller/cronjobs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
)

type jobIdentifier struct {
Expand All @@ -33,7 +34,7 @@ type State struct {

// Newly created jobs should be attempted by the controller that created them until other controllers
// have a chance to reset their job lists and share responsibilities through the hash ring
newJobs map[jobIdentifier]bool
newJobs map[jobIdentifier]time.Time

Check failure on line 37 in backend/controller/cronjobs/state.go

View workflow job for this annotation

GitHub Actions / Lint

invalid map key type jobIdentifier (typecheck)

blockedUntil time.Time
}
Expand All @@ -47,28 +48,26 @@ func (s *State) startedExecutingJob(job dal.CronJob) {
}

func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool {
return s.newJobs[identifierForJob(job)]
if t, ok := s.newJobs[identifierForJob(job)]; ok {
if time.Since(t) < newJobHashRingOverrideInterval {
return true
}
delete(s.newJobs, identifierForJob(job))
}
return false
}

func (s *State) reset(jobs []dal.CronJob) {
func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) {
s.jobs = make([]dal.CronJob, len(jobs))
copy(s.jobs, jobs)
for _, job := range s.jobs {
if job.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(job))
}
}
s.newJobs = map[jobIdentifier]bool{}
}

func (s *State) addJobs(added []dal.CronJob) {
// check if created job already arrived due to a reset
existingMap := jobMap(s.jobs)
for _, job := range added {
if _, exists := existingMap[identifierForJob(job)]; !exists {
s.jobs = append(s.jobs, job)
if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey == newKey {
// This job is new and should be attempted by the current controller
s.newJobs[identifierForJob(job)] = time.Now()
}
s.newJobs[identifierForJob(job)] = true
}
}

Expand Down
35 changes: 17 additions & 18 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type IngressRoutingEntry struct {
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []CronJob, startTime time.Time) (key model.DeploymentKey, err error) {
logger := log.FromContext(ctx)

// Start the transaction
Expand Down Expand Up @@ -490,6 +490,22 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

for _, job := range cronJobs {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{
DeploymentKey: deploymentKey,
ModuleName: job.Module,
Verb: job.Verb,
StartTime: job.StartTime,
Schedule: job.Schedule,
NextExecution: job.NextExecution,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("%s: %w", "failed to create cron job", translatePGError(err))
}
}

return deploymentKey, nil
}

Expand Down Expand Up @@ -938,23 +954,6 @@ func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) {
return slices.Map(rows, cronJobFromRow), nil
}

func (d *DAL) CreateCronJob(ctx context.Context, deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (CronJob, error) {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
row, err := d.db.CreateCronJob(ctx, sql.CreateCronJobParams{
DeploymentKey: deploymentKey,
ModuleName: module,
Verb: verb,
StartTime: startTime,
Schedule: schedule,
NextExecution: nextExecution,
})
if err != nil {
return CronJob{}, translatePGError(err)
}
return cronJobFromRow(sql.GetCronJobsRow(row)), nil
}

// StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row
// Also returns a map of deployment keys that have no replicas, so the caller can remove them from the list of jobs
// Note: jobs that were started may overlap with deployments that have no replicas, and this must be handled specially.
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDAL(t *testing.T) {
Digest: testSha,
Executable: true,
Path: "dir/filename",
}}, nil)
}}, nil, nil, time.Now())
assert.NoError(t, err)
})

Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ac3efcf

Please sign in to comment.