Skip to content

Commit

Permalink
fix: skip out of date file changes (#1279)
Browse files Browse the repository at this point in the history
Fixes #1210 

This required a bit of a refactor to use pointers for `Module` and
`ExternalLibrary` here. Hopefully, that's ok.

Example of rapid changes getting skipped. Note that I logged skips in
`Warnf` just to highlight for this screenshot. Real skip messages are
logged at `Debugf`

![Screenshot 2024-04-16 at 2 03
09 PM](https://github.com/TBD54566975/ftl/assets/51647/079878c6-3fa6-4f4c-9705-7a422e639db6)
  • Loading branch information
wesbillman authored Apr 16, 2024
1 parent 053c471 commit 1f04045
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 33 deletions.
2 changes: 1 addition & 1 deletion buildengine/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
matches := kotlinImportRegex.FindStringSubmatch(scanner.Text())
if matches != nil && len(matches) > 1 {
if len(matches) > 1 {
module := strings.Split(matches[1], ".")[0]
if module == self {
continue
Expand Down
79 changes: 48 additions & 31 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type schemaChange struct {
*schema.Module
}

type projectMeta struct {
project Project
lastBuildStartTime time.Time
}

type Listener interface {
OnBuildStarted(project Project)
}
Expand All @@ -43,7 +48,7 @@ func (b BuildStartedListenerFunc) OnBuildStarted(project Project) { b(project) }
// Engine for building a set of modules.
type Engine struct {
client ftlv1connect.ControllerServiceClient
projects map[ProjectKey]Project
projectMetas map[ProjectKey]projectMeta
moduleDirs []string
externalDirs []string
controllerSchema *xsync.MapOf[string, *schema.Module]
Expand Down Expand Up @@ -82,7 +87,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
client: client,
moduleDirs: moduleDirs,
externalDirs: externalDirs,
projects: map[ProjectKey]Project{},
projectMetas: map[ProjectKey]projectMeta{},
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
parallelism: runtime.NumCPU(),
Expand All @@ -104,7 +109,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
if err != nil {
return nil, err
}
e.projects[project.Config().Key] = project
e.projectMetas[project.Config().Key] = projectMeta{project: project}
e.projectsToBuild.Store(project.Config().Key, true)
}

Expand Down Expand Up @@ -167,7 +172,7 @@ func (e *Engine) Close() error {
func (e *Engine) Graph(projects ...ProjectKey) (map[string][]string, error) {
out := map[string][]string{}
if len(projects) == 0 {
projects = maps.Keys(e.projects)
projects = maps.Keys(e.projectMetas)
}
for _, key := range projects {
if err := e.buildGraph(string(key), out); err != nil {
Expand All @@ -179,8 +184,8 @@ func (e *Engine) Graph(projects ...ProjectKey) (map[string][]string, error) {

func (e *Engine) buildGraph(key string, out map[string][]string) error {
var deps []string
if project, ok := e.projects[ProjectKey(key)]; ok {
deps = project.Config().Dependencies
if meta, ok := e.projectMetas[ProjectKey(key)]; ok {
deps = meta.project.Config().Dependencies
} else if sch, ok := e.controllerSchema.Load(key); ok {
deps = sch.Imports()
} else {
Expand Down Expand Up @@ -267,8 +272,8 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
switch event := event.(type) {
case WatchEventProjectAdded:
config := event.Project.Config()
if _, exists := e.projects[config.Key]; !exists {
e.projects[config.Key] = event.Project
if _, exists := e.projectMetas[config.Key]; !exists {
e.projectMetas[config.Key] = projectMeta{project: event.Project}
err := e.buildAndDeploy(ctx, 1, true, config.Key)
if err != nil {
logger.Errorf(err, "deploy %s failed", config.Key)
Expand All @@ -282,9 +287,15 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
logger.Errorf(err, "terminate %s failed", module.Module)
}
}
delete(e.projects, config.Key)
delete(e.projectMetas, config.Key)
case WatchEventProjectChanged:
config := event.Project.Config()

lastBuildTime := e.projectMetas[config.Key].lastBuildStartTime
if event.Time.Before(lastBuildTime) {
logger.Warnf("Skipping build and deploy; event time %v is before the last build time %v", event.Time, lastBuildTime)
continue // Skip this event as it's outdated
}
err := e.buildAndDeploy(ctx, 1, true, config.Key)
if err != nil {
switch project := event.Project.(type) {
Expand All @@ -293,7 +304,6 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
case ExternalLibrary:
logger.Errorf(err, "build failed for library %q: %v", project.Config().Key, err)
}

}
}
case change := <-schemaChanges:
Expand Down Expand Up @@ -339,8 +349,8 @@ func computeModuleHash(module *schema.Module) ([]byte, error) {

func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
dependentProjectKeys := map[ProjectKey]bool{}
for k, project := range e.projects {
for _, dep := range project.Config().Dependencies {
for k, meta := range e.projectMetas {
for _, dep := range meta.project.Config().Dependencies {
if dep == name {
dependentProjectKeys[k] = true
}
Expand All @@ -352,7 +362,7 @@ func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error {
logger := log.FromContext(ctx)
if len(projects) == 0 {
projects = maps.Keys(e.projects)
projects = maps.Keys(e.projectMetas)
}

buildGroup := errgroup.Group{}
Expand All @@ -362,7 +372,12 @@ func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDepl
return e.buildWithCallback(ctx, func(buildCtx context.Context, builtProject Project) error {
deployGroup.Go(func() error {
e.projectsToBuild.Store(builtProject.Config().Key, false)
return Deploy(buildCtx, builtProject.(Module), replicas, waitForDeployOnline, e.client) //nolint:forcetypeassert
module, ok := builtProject.(Module)
if !ok {
// Skip deploying external libraries
return nil
}
return Deploy(buildCtx, module, replicas, waitForDeployOnline, e.client)
})
return nil
}, projects...)
Expand Down Expand Up @@ -397,20 +412,20 @@ type buildCallback func(ctx context.Context, project Project) error
func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, projects ...ProjectKey) error {
mustBuild := map[ProjectKey]bool{}
if len(projects) == 0 {
projects = maps.Keys(e.projects)
projects = maps.Keys(e.projectMetas)
}
for _, key := range projects {
project, ok := e.projects[key]
meta, ok := e.projectMetas[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}
// Update dependencies before building.
var err error
project, err = UpdateDependencies(ctx, project)
project, err := UpdateDependencies(ctx, meta.project)
if err != nil {
return err
}
e.projects[key] = project
e.projectMetas[key] = projectMeta{project: project}
mustBuild[key] = true
}
graph, err := e.Graph(projects...)
Expand Down Expand Up @@ -473,21 +488,23 @@ func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, ke
return e.mustSchema(ctx, key, builtModules, schemas)
}

project, ok := e.projects[key]
meta, ok := e.projectMetas[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}

for _, dep := range project.Config().Dependencies {
for _, dep := range meta.project.Config().Dependencies {
if _, ok := builtModules[dep]; !ok {
logger.Warnf("%q build skipped because its dependency %q failed to build", key, dep)
return nil
}
}

meta.lastBuildStartTime = time.Now()
e.projectMetas[key] = meta
err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
return callback(ctx, e.projectMetas[key].project)
}

return err
Expand All @@ -506,25 +523,25 @@ func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules ma
//
// Assumes that all dependencies have been built and are available in "built".
func (e *Engine) build(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error {
project, ok := e.projects[key]
meta, ok := e.projectMetas[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}

combined := map[string]*schema.Module{}
if err := e.gatherSchemas(builtModules, project, combined); err != nil {
if err := e.gatherSchemas(builtModules, meta.project, combined); err != nil {
return err
}
sch := &schema.Schema{Modules: maps.Values(combined)}

if e.listener != nil {
e.listener.OnBuildStarted(project)
e.listener.OnBuildStarted(meta.project)
}
err := Build(ctx, sch, project)
err := Build(ctx, sch, meta.project)
if err != nil {
return err
}
if module, ok := project.(Module); ok {
if module, ok := meta.project.(Module); ok {
moduleSchema, err := schema.ModuleFromProtoFile(filepath.Join(module.Dir, module.DeployDir, module.Schema))
if err != nil {
return fmt.Errorf("could not load schema for module %q: %w", module.Config().Key, err)
Expand All @@ -540,18 +557,18 @@ func (e *Engine) gatherSchemas(
project Project,
out map[string]*schema.Module,
) error {
latestModule, ok := e.projects[project.Config().Key]
latestModule, ok := e.projectMetas[project.Config().Key]
if !ok {
latestModule = project
latestModule = projectMeta{project: project}
}
for _, dep := range latestModule.Config().Dependencies {
for _, dep := range latestModule.project.Config().Dependencies {
out[dep] = moduleSchemas[dep]
if dep != "builtin" {
depModule, ok := e.projects[ProjectKey(dep)]
depModule, ok := e.projectMetas[ProjectKey(dep)]
// TODO: should we be gathering schemas from dependencies without a project?
// This can happen if the schema is loaded from the controller
if ok {
if err := e.gatherSchemas(moduleSchemas, depModule, out); err != nil {
if err := e.gatherSchemas(moduleSchemas, depModule.project, out); err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion buildengine/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type WatchEventProjectChanged struct {
Project Project
Change FileChangeType
Path string
Time time.Time
}

func (WatchEventProjectChanged) watchEvent() {}
Expand Down Expand Up @@ -86,7 +87,7 @@ func Watch(ctx context.Context, period time.Duration, moduleDirs []string, exter
continue
}
logger.Debugf("changed %s %q: %c%s", project.TypeString(), project.Config().Key, changeType, path)
topic.Publish(WatchEventProjectChanged{Project: existingProject.Project, Change: changeType, Path: path})
topic.Publish(WatchEventProjectChanged{Project: existingProject.Project, Change: changeType, Path: path, Time: time.Now()})
existingProjects[config.Dir] = projectHashes{Hashes: hashes, Project: existingProject.Project}
continue
}
Expand Down

0 comments on commit 1f04045

Please sign in to comment.