Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cron jobs #1155

Merged
merged 17 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/scaling"
Expand All @@ -49,10 +50,11 @@ import (

// CommonConfig between the production controller and development server.
type CommonConfig struct {
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
NoConsole bool `help:"Disable the console."`
IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"`
WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"`
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
NoConsole bool `help:"Disable the console."`
IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"`
WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"`
CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"`
}

type Config struct {
Expand Down Expand Up @@ -138,12 +140,20 @@ type clients struct {
runner ftlv1connect.RunnerServiceClient
}

// ControllerListListener is regularly notified of the current list of controllers
// This is often used to update a hash ring to distribute work.
type ControllerListListener interface {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than scheduledtask and cronjob services both polling the db for the controller list to update their hashrings, I've centralised the polling into the controller, which then broadcasts it to listeners

matt2e marked this conversation as resolved.
Show resolved Hide resolved
UpdatedControllerList(ctx context.Context, controllers []dal.Controller)
}

type Service struct {
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink

tasks *scheduledtask.Scheduler
tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
controllerListListeners []ControllerListListener

// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]
Expand All @@ -163,7 +173,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
tasks: scheduledtask.New(ctx, key),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -174,8 +184,13 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
increaseReplicaFailures: map[string]int{},
}

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc)

svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments)
Expand Down Expand Up @@ -422,6 +437,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
return nil, fmt.Errorf("could not replace deployment: %w", err)
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}

Expand Down Expand Up @@ -732,11 +750,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil)
cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema)
if err != nil {
logger.Errorf(err, "Could not generate cron jobs for new deployment")
return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err)
}

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}

deploymentLogger := s.getDeploymentLogger(ctx, dkey)
deploymentLogger.Debugf("Created deployment %s", dkey)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
Expand Down Expand Up @@ -999,7 +1024,17 @@ func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error
return 0, fmt.Errorf("failed to heartbeat controller: %w", err)
}
return time.Second * 3, nil
}

func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) {
controllers, err := s.dal.GetControllers(ctx, false)
if err != nil {
return 0, err
}
for _, listener := range s.controllerListListeners {
listener.UpdatedControllerList(ctx, controllers)
}
return time.Second * 5, nil
}

func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error {
Expand Down
Loading