Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: build errors canceling deploy contexts #1225

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 85 additions & 49 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -45,12 +46,12 @@ type Engine struct {
projects map[ProjectKey]Project
moduleDirs []string
externalDirs []string
commands map[string]string
controllerSchema *xsync.MapOf[string, *schema.Module]
schemaChanges *pubsub.Topic[schemaChange]
cancel func()
parallelism int
listener Listener
projectsToBuild *xsync.MapOf[ProjectKey, bool]
}

type Option func(o *Engine)
Expand Down Expand Up @@ -85,6 +86,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
parallelism: runtime.NumCPU(),
projectsToBuild: xsync.NewMapOf[ProjectKey, bool](),
}
for _, option := range options {
option(e)
Expand All @@ -103,6 +105,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
return nil, err
}
e.projects[project.Config().Key] = project
e.projectsToBuild.Store(project.Config().Key, true)
}

if client == nil {
Expand Down Expand Up @@ -240,8 +243,10 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
err := e.buildAndDeploy(ctx, 1, true)
if err != nil {
logger.Errorf(err, "initial deploy failed")
} else {
logger.Infof("All modules deployed, watching for changes...")
}
logger.Infof("All modules deployed, watching for changes...")

moduleHashes := map[string][]byte{}
e.controllerSchema.Range(func(name string, sch *schema.Module) bool {
hash, err := computeModuleHash(sch)
Expand Down Expand Up @@ -319,7 +324,6 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}
}

}
}

Expand All @@ -346,52 +350,46 @@ func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
}

func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error {
logger := log.FromContext(ctx)
if len(projects) == 0 {
projects = maps.Keys(e.projects)
}

deployQueue := make(chan Project, len(projects))
wg, ctx := errgroup.WithContext(ctx)
buildGroup := errgroup.Group{}
deployGroup := errgroup.Group{}

// Build all projects and enqueue the modules for deployment.
wg.Go(func() error {
defer close(deployQueue)

return e.buildWithCallback(ctx, func(ctx context.Context, project Project) error {
if _, ok := project.(Module); ok {
select {
case deployQueue <- project:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
buildGroup.Go(func() error {
return e.buildWithCallback(ctx, func(buildCtx context.Context, builtProject Project) error {
deployGroup.Go(func() error {
e.projectsToBuild.Store(builtProject.Config().Key, false)
return Deploy(buildCtx, builtProject.(Module), replicas, waitForDeployOnline, e.client) //nolint:forcetypeassert
})
return nil
}, projects...)
})

// Process deployment queue.
for range len(projects) {
wg.Go(func() error {
for {
select {
case project, ok := <-deployQueue:
if !ok {
return nil
}
if module, ok := project.(Module); ok {
err := Deploy(ctx, module, replicas, waitForDeployOnline, e.client)
if err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
// Wait for all build and deploy attempts to complete
buildErr := buildGroup.Wait()
deployErr := deployGroup.Wait()

pendingInitialBuilds := []string{}
e.projectsToBuild.Range(func(key ProjectKey, value bool) bool {
if value {
pendingInitialBuilds = append(pendingInitialBuilds, string(key))
}
return true
})

// Print out all modules that have yet to build if there are any errors
if len(pendingInitialBuilds) > 0 {
logger.Infof("Modules waiting to build: %s", strings.Join(pendingInitialBuilds, ", "))
}

if buildErr != nil {
return buildErr
}
return wg.Wait()

return deployErr
}

type buildCallback func(ctx context.Context, project Project) error
Expand Down Expand Up @@ -424,39 +422,77 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback,
}

topology := TopologicalSort(graph)

errCh := make(chan error, 1024)
for _, group := range topology {
// Collect schemas to be inserted into "built" map for subsequent groups.
schemas := make(chan *schema.Module, len(group))
wg, ctx := errgroup.WithContext(ctx)

wg := errgroup.Group{}
wg.SetLimit(e.parallelism)
for _, keyStr := range group {
key := ProjectKey(keyStr)
wg.Go(func() error {
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
wg.Go(func() error {
err := e.tryBuild(ctx, mustBuild, key, builtModules, schemas, callback)
if err != nil {
errCh <- err
}
return err
return nil
})
}
err := wg.Wait()

err = wg.Wait()
if err != nil {
return err
}

// Now this group is built, collect all the schemas.
close(schemas)
for sch := range schemas {
builtModules[sch.Name] = sch
}
}

close(errCh)
allErrors := []error{}
for err := range errCh {
allErrors = append(allErrors, err)
}

if len(allErrors) > 0 {
return errors.Join(allErrors...)
}

return nil
}

func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, key ProjectKey, builtModules map[string]*schema.Module, schemas chan *schema.Module, callback buildCallback) error {
logger := log.FromContext(ctx)
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

project, ok := e.projects[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}

for _, dep := range project.Config().Dependencies {
if _, ok := builtModules[dep]; !ok {
logger.Warnf("%q build skipped because its dependency %q failed to build", key, dep)
return nil
}
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
}

return err
}

// Publish either the schema from the FTL controller, or from a local build.
func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error {
if sch, ok := e.controllerSchema.Load(string(key)); ok {
Expand Down