Skip to content

Commit

Permalink
fix: build errors canceling deploy contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Apr 11, 2024
1 parent dfc8380 commit 0ef1803
Showing 1 changed file with 85 additions and 49 deletions.
134 changes: 85 additions & 49 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -45,12 +46,12 @@ type Engine struct {
projects map[ProjectKey]Project
moduleDirs []string
externalDirs []string
commands map[string]string
controllerSchema *xsync.MapOf[string, *schema.Module]
schemaChanges *pubsub.Topic[schemaChange]
cancel func()
parallelism int
listener Listener
projectsToBuild *xsync.MapOf[ProjectKey, bool]
}

type Option func(o *Engine)
Expand Down Expand Up @@ -85,6 +86,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
parallelism: runtime.NumCPU(),
projectsToBuild: xsync.NewMapOf[ProjectKey, bool](),
}
for _, option := range options {
option(e)
Expand All @@ -103,6 +105,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
return nil, err
}
e.projects[project.Config().Key] = project
e.projectsToBuild.Store(project.Config().Key, true)
}

if client == nil {
Expand Down Expand Up @@ -240,8 +243,10 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
err := e.buildAndDeploy(ctx, 1, true)
if err != nil {
logger.Errorf(err, "initial deploy failed")
} else {
logger.Infof("All modules deployed, watching for changes...")
}
logger.Infof("All modules deployed, watching for changes...")

moduleHashes := map[string][]byte{}
e.controllerSchema.Range(func(name string, sch *schema.Module) bool {
hash, err := computeModuleHash(sch)
Expand Down Expand Up @@ -319,7 +324,6 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}
}

}
}

Expand All @@ -346,52 +350,46 @@ func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
}

func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error {
logger := log.FromContext(ctx)
if len(projects) == 0 {
projects = maps.Keys(e.projects)
}

deployQueue := make(chan Project, len(projects))
wg, ctx := errgroup.WithContext(ctx)
buildGroup := errgroup.Group{}
deployGroup := errgroup.Group{}

// Build all projects and enqueue the modules for deployment.
wg.Go(func() error {
defer close(deployQueue)

return e.buildWithCallback(ctx, func(ctx context.Context, project Project) error {
if _, ok := project.(Module); ok {
select {
case deployQueue <- project:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
buildGroup.Go(func() error {
return e.buildWithCallback(ctx, func(buildCtx context.Context, builtProject Project) error {
deployGroup.Go(func() error {
e.projectsToBuild.Store(builtProject.Config().Key, false)
return Deploy(buildCtx, builtProject.(Module), replicas, waitForDeployOnline, e.client) //nolint:forcetypeassert
})
return nil
}, projects...)
})

// Process deployment queue.
for range len(projects) {
wg.Go(func() error {
for {
select {
case project, ok := <-deployQueue:
if !ok {
return nil
}
if module, ok := project.(Module); ok {
err := Deploy(ctx, module, replicas, waitForDeployOnline, e.client)
if err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
// Wait for all build and deploy attempts to complete
buildErr := buildGroup.Wait()
deployErr := deployGroup.Wait()

pendingInitialBuilds := []string{}
e.projectsToBuild.Range(func(key ProjectKey, value bool) bool {
if value {
pendingInitialBuilds = append(pendingInitialBuilds, string(key))
}
return true
})

// Print out all modules that have yet to build if there are any errors
if len(pendingInitialBuilds) > 0 {
logger.Infof("Modules waiting to build: %s", strings.Join(pendingInitialBuilds, ", "))
}

if buildErr != nil {
return buildErr
}
return wg.Wait()

return deployErr
}

type buildCallback func(ctx context.Context, project Project) error
Expand Down Expand Up @@ -424,39 +422,77 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback,
}

topology := TopologicalSort(graph)

errCh := make(chan error, 1024)
for _, group := range topology {
// Collect schemas to be inserted into "built" map for subsequent groups.
schemas := make(chan *schema.Module, len(group))
wg, ctx := errgroup.WithContext(ctx)

wg := errgroup.Group{}
wg.SetLimit(e.parallelism)
for _, keyStr := range group {
key := ProjectKey(keyStr)
wg.Go(func() error {
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
wg.Go(func() error {
err := e.tryBuild(ctx, mustBuild, key, builtModules, schemas, callback)
if err != nil {
errCh <- err
}
return err
return nil
})
}
err := wg.Wait()

err = wg.Wait()
if err != nil {
return err
}

// Now this group is built, collect all the schemas.
close(schemas)
for sch := range schemas {
builtModules[sch.Name] = sch
}
}

close(errCh)
allErrors := []error{}
for err := range errCh {
allErrors = append(allErrors, err)
}

if len(allErrors) > 0 {
return errors.Join(allErrors...)
}

return nil
}

func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, key ProjectKey, builtModules map[string]*schema.Module, schemas chan *schema.Module, callback buildCallback) error {
logger := log.FromContext(ctx)
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

project, ok := e.projects[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}

for _, dep := range project.Config().Dependencies {
if _, ok := builtModules[dep]; !ok {
logger.Warnf("%q build skipped because its dependency %q failed to build", key, dep)
return nil
}
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
}

return err
}

// Publish either the schema from the FTL controller, or from a local build.
func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error {
if sch, ok := e.controllerSchema.Load(string(key)); ok {
Expand Down

0 comments on commit 0ef1803

Please sign in to comment.