Skip to content

Commit

Permalink
feat: hot reload
Browse files Browse the repository at this point in the history
JVM languages can now used how reload for instant feedback
  • Loading branch information
stuartwdouglas committed Nov 15, 2024
1 parent 876aab0 commit 617f4cb
Show file tree
Hide file tree
Showing 29 changed files with 766 additions and 407 deletions.
1 change: 1 addition & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
ingressRoutes := extractIngressRoutingEntries(req.Msg)

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
logger.Infof("Created deployment %s", dkey)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
Expand Down
61 changes: 49 additions & 12 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,32 @@ type localScaling struct {
portAllocator *bind.BindAllocator
controllerAddresses []*url.URL

prevRunnerSuffix int
ideSupport optional.Option[localdebug.IDEIntegration]
prevRunnerSuffix int
ideSupport optional.Option[localdebug.IDEIntegration]
devModeEndpointsUpdates <-chan scaling.DevModeEndpoints
devModeEndpoints map[string]*devModeRunner
}

type devModeRunner struct {
uri url.URL
deploymentKey string
}

func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser leases.Leaser) error {
go func() {
for {
select {
case <-ctx.Done():
return
case devEndpoints := <-l.devModeEndpointsUpdates:
l.lock.Lock()
l.devModeEndpoints[devEndpoints.Module] = &devModeRunner{
uri: devEndpoints.Endpoint,
}
l.lock.Unlock()
}
}
}()
scaling.BeginGrpcScaling(ctx, endpoint, leaser, l.handleSchemaChange)
return nil
}
Expand Down Expand Up @@ -80,20 +101,22 @@ type runnerInfo struct {
port string
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool) (scaling.RunnerScaling, error) {
func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, devModeEndpoints <-chan scaling.DevModeEndpoints) (scaling.RunnerScaling, error) {

cacheDir, err := os.UserCacheDir()
if err != nil {
return nil, err
}
local := localScaling{
lock: sync.Mutex{},
cacheDir: cacheDir,
runners: map[string]map[string]*deploymentInfo{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
prevRunnerSuffix: -1,
debugPorts: map[string]*localdebug.DebugInfo{},
lock: sync.Mutex{},
cacheDir: cacheDir,
runners: map[string]map[string]*deploymentInfo{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
prevRunnerSuffix: -1,
debugPorts: map[string]*localdebug.DebugInfo{},
devModeEndpointsUpdates: devModeEndpoints,
devModeEndpoints: map[string]*devModeRunner{},
}
if enableIDEIntegration && configPath != "" {
local.ideSupport = optional.Ptr(localdebug.NewIDEIntegration(configPath))
Expand Down Expand Up @@ -165,6 +188,17 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
return nil
default:
}

devEndpoint := l.devModeEndpoints[info.module]
devUri := optional.None[url.URL]()
if devEndpoint != nil {
devUri = optional.Some(devEndpoint.uri)
if devEndpoint.deploymentKey == deploymentKey {
// Already running, don't start another
return nil
}
devEndpoint.deploymentKey = deploymentKey
}
controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -197,6 +231,7 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
Key: model.NewLocalRunnerKey(keySuffix),
Deployment: deploymentKey,
DebugPort: debugPort,
DevEndpoint: devUri,
}

simpleName := fmt.Sprintf("runner%d", keySuffix)
Expand All @@ -219,10 +254,12 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
err := runner.Start(runnerCtx, config)
l.lock.Lock()
defer l.lock.Unlock()
if devEndpoint != nil {
devEndpoint.deploymentKey = ""
}
// Don't count context.Canceled as an a restart error
if err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf(err, "Runner failed: %s", err)
} else {
// Don't count context.Canceled as an a restart error
info.exits++
}
if info.exits >= maxExits {
Expand Down
5 changes: 5 additions & 0 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ func runGrpcScaling(ctx context.Context, url url.URL, handler func(ctx context.C
rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, handler, rpc.AlwaysRetry())
logger.Debugf("Stopped Runner Scaling")
}

type DevModeEndpoints struct {
Module string
Endpoint url.URL
}
Loading

0 comments on commit 617f4cb

Please sign in to comment.