From 0ef1803246bdd8111f979e1f670a6ba6ae59dc60 Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 10 Apr 2024 08:56:34 -0700 Subject: [PATCH] fix: build errors canceling deploy contexts --- buildengine/engine.go | 134 +++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 49 deletions(-) diff --git a/buildengine/engine.go b/buildengine/engine.go index 9cb9ec452..668285e07 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/sha256" + "errors" "fmt" "path/filepath" "runtime" @@ -45,12 +46,12 @@ type Engine struct { projects map[ProjectKey]Project moduleDirs []string externalDirs []string - commands map[string]string controllerSchema *xsync.MapOf[string, *schema.Module] schemaChanges *pubsub.Topic[schemaChange] cancel func() parallelism int listener Listener + projectsToBuild *xsync.MapOf[ProjectKey, bool] } type Option func(o *Engine) @@ -85,6 +86,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul controllerSchema: xsync.NewMapOf[string, *schema.Module](), schemaChanges: pubsub.New[schemaChange](), parallelism: runtime.NumCPU(), + projectsToBuild: xsync.NewMapOf[ProjectKey, bool](), } for _, option := range options { option(e) @@ -103,6 +105,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul return nil, err } e.projects[project.Config().Key] = project + e.projectsToBuild.Store(project.Config().Key, true) } if client == nil { @@ -240,8 +243,10 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration err := e.buildAndDeploy(ctx, 1, true) if err != nil { logger.Errorf(err, "initial deploy failed") + } else { + logger.Infof("All modules deployed, watching for changes...") } - logger.Infof("All modules deployed, watching for changes...") + moduleHashes := map[string][]byte{} e.controllerSchema.Range(func(name string, sch *schema.Module) bool { hash, err := computeModuleHash(sch) @@ -319,7 +324,6 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } } } - } } @@ -346,52 +350,46 @@ func (e *Engine) getDependentProjectKeys(name string) []ProjectKey { } func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error { + logger := log.FromContext(ctx) if len(projects) == 0 { projects = maps.Keys(e.projects) } - deployQueue := make(chan Project, len(projects)) - wg, ctx := errgroup.WithContext(ctx) + buildGroup := errgroup.Group{} + deployGroup := errgroup.Group{} - // Build all projects and enqueue the modules for deployment. - wg.Go(func() error { - defer close(deployQueue) - - return e.buildWithCallback(ctx, func(ctx context.Context, project Project) error { - if _, ok := project.(Module); ok { - select { - case deployQueue <- project: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } + buildGroup.Go(func() error { + return e.buildWithCallback(ctx, func(buildCtx context.Context, builtProject Project) error { + deployGroup.Go(func() error { + e.projectsToBuild.Store(builtProject.Config().Key, false) + return Deploy(buildCtx, builtProject.(Module), replicas, waitForDeployOnline, e.client) //nolint:forcetypeassert + }) return nil }, projects...) }) - // Process deployment queue. - for range len(projects) { - wg.Go(func() error { - for { - select { - case project, ok := <-deployQueue: - if !ok { - return nil - } - if module, ok := project.(Module); ok { - err := Deploy(ctx, module, replicas, waitForDeployOnline, e.client) - if err != nil { - return err - } - } - case <-ctx.Done(): - return ctx.Err() - } - } - }) + // Wait for all build and deploy attempts to complete + buildErr := buildGroup.Wait() + deployErr := deployGroup.Wait() + + pendingInitialBuilds := []string{} + e.projectsToBuild.Range(func(key ProjectKey, value bool) bool { + if value { + pendingInitialBuilds = append(pendingInitialBuilds, string(key)) + } + return true + }) + + // Print out all modules that have yet to build if there are any errors + if len(pendingInitialBuilds) > 0 { + logger.Infof("Modules waiting to build: %s", strings.Join(pendingInitialBuilds, ", ")) + } + + if buildErr != nil { + return buildErr } - return wg.Wait() + + return deployErr } type buildCallback func(ctx context.Context, project Project) error @@ -424,29 +422,31 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, } topology := TopologicalSort(graph) + + errCh := make(chan error, 1024) for _, group := range topology { // Collect schemas to be inserted into "built" map for subsequent groups. schemas := make(chan *schema.Module, len(group)) - wg, ctx := errgroup.WithContext(ctx) + + wg := errgroup.Group{} wg.SetLimit(e.parallelism) for _, keyStr := range group { key := ProjectKey(keyStr) - wg.Go(func() error { - if !mustBuild[key] { - return e.mustSchema(ctx, key, builtModules, schemas) - } - err := e.build(ctx, key, builtModules, schemas) - if err == nil && callback != nil { - return callback(ctx, e.projects[key]) + wg.Go(func() error { + err := e.tryBuild(ctx, mustBuild, key, builtModules, schemas, callback) + if err != nil { + errCh <- err } - return err + return nil }) } - err := wg.Wait() + + err = wg.Wait() if err != nil { return err } + // Now this group is built, collect all the schemas. close(schemas) for sch := range schemas { @@ -454,9 +454,45 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, } } + close(errCh) + allErrors := []error{} + for err := range errCh { + allErrors = append(allErrors, err) + } + + if len(allErrors) > 0 { + return errors.Join(allErrors...) + } + return nil } +func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, key ProjectKey, builtModules map[string]*schema.Module, schemas chan *schema.Module, callback buildCallback) error { + logger := log.FromContext(ctx) + if !mustBuild[key] { + return e.mustSchema(ctx, key, builtModules, schemas) + } + + project, ok := e.projects[key] + if !ok { + return fmt.Errorf("project %q not found", key) + } + + for _, dep := range project.Config().Dependencies { + if _, ok := builtModules[dep]; !ok { + logger.Warnf("%q build skipped because its dependency %q failed to build", key, dep) + return nil + } + } + + err := e.build(ctx, key, builtModules, schemas) + if err == nil && callback != nil { + return callback(ctx, e.projects[key]) + } + + return err +} + // Publish either the schema from the FTL controller, or from a local build. func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error { if sch, ok := e.controllerSchema.Load(string(key)); ok {