From 6249b9d4bc06adfab7ba965446b45b770cfd7a4c Mon Sep 17 00:00:00 2001 From: "malik.hadjri" Date: Mon, 16 Dec 2024 16:24:19 -0500 Subject: [PATCH 1/4] DEVPROD-10103 Account for cross-build dependents in NumDependents --- model/lifecycle.go | 49 ++++++++------- model/lifecycle_test.go | 6 +- model/patch_lifecycle.go | 1 + model/task/db.go | 2 +- model/task/task.go | 4 +- repotracker/repotracker.go | 2 + repotracker/repotracker_test.go | 107 ++++++++++++++++++++++++++++++++ 7 files changed, 141 insertions(+), 30 deletions(-) diff --git a/model/lifecycle.go b/model/lifecycle.go index e1515ba0c36..ccade61b11a 100644 --- a/model/lifecycle.go +++ b/model/lifecycle.go @@ -502,10 +502,6 @@ func addTasksToBuild(ctx context.Context, creationInfo TaskCreationInfo) (*build return nil, nil, errors.Wrapf(err, "creating tasks for build '%s'", creationInfo.Build.Id) } - if err = tasks.InsertUnordered(ctx); err != nil { - return nil, nil, errors.Wrapf(err, "inserting tasks for build '%s'", creationInfo.Build.Id) - } - var hasGitHubCheck bool var hasUnfinishedEssentialTask bool for _, t := range tasks { @@ -525,11 +521,6 @@ func addTasksToBuild(ctx context.Context, creationInfo TaskCreationInfo) (*build return nil, nil, errors.Wrapf(err, "setting build '%s' as having an unfinished essential task", creationInfo.Build.Id) } - // update the build to hold the new tasks - if err = RefreshTasksCache(creationInfo.Build.Id); err != nil { - return nil, nil, errors.Wrapf(err, "updating task cache for '%s'", creationInfo.Build.Id) - } - batchTimeTaskStatuses := []BatchTimeTaskStatus{} tasksWithActivationTime := creationInfo.ActivationInfo.getActivationTasks(creationInfo.Build.BuildVariant) batchTimeCatcher := grip.NewBasicCatcher() @@ -862,10 +853,6 @@ func createTasksForBuild(ctx context.Context, creationInfo TaskCreationInfo) (ta tasks = append(tasks, t) } - // Set the NumDependents field - // Existing tasks in the db and tasks in other builds are not updated - setNumDeps(tasks) - sort.Stable(tasks) // return all of the tasks created @@ -975,28 +962,27 @@ func shouldSyncTask(syncVariantsTasks []patch.VariantTasks, bv, task string) boo return false } -// setNumDeps sets NumDependents for each task in tasks. +// SetNumDependents sets NumDependents for each task in tasks. // NumDependents is the number of tasks depending on the task. Only tasks created at the same time // and in the same variant are included. -func setNumDeps(tasks []*task.Task) { +func SetNumDependents(tasks []*task.Task) { idToTask := make(map[string]*task.Task) for i, task := range tasks { idToTask[task.Id] = tasks[i] } deduplicatedTasks := []*task.Task{} for _, task := range idToTask { - task.NumDependents = 0 deduplicatedTasks = append(deduplicatedTasks, task) } for _, task := range deduplicatedTasks { // Recursively find all tasks that task depends on and increments their NumDependents field - setNumDepsRec(task, idToTask, make(map[string]bool)) + setNumDependentsRec(task, idToTask, make(map[string]bool)) } } -// setNumDepsRec recursively finds all tasks that task depends on and increments their NumDependents field. +// setNumDependentsRec recursively finds all tasks that task depends on and increments their NumDependents field. // tasks not in idToTasks are not affected. -func setNumDepsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[string]bool) { +func setNumDependentsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[string]bool) { for _, dep := range t.DependsOn { // Check whether this dependency is included in the tasks we're currently creating depTask, ok := idToTasks[dep.TaskId] @@ -1008,7 +994,7 @@ func setNumDepsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[strin if !seen[depTask.Id] { seen[depTask.Id] = true depTask.NumDependents = depTask.NumDependents + 1 - setNumDepsRec(depTask, idToTasks, seen) + setNumDependentsRec(depTask, idToTasks, seen) } } } @@ -1052,7 +1038,7 @@ func RecomputeNumDependents(ctx context.Context, t task.Task) error { taskPtrs = append(taskPtrs, &versionTasks[i]) } - setNumDeps(taskPtrs) + SetNumDependents(taskPtrs) catcher := grip.NewBasicCatcher() for _, t := range taskPtrs { catcher.Add(t.SetNumDependents()) @@ -1532,6 +1518,7 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu newActivatedTasks := []task.Task{} newBuildStatuses := make([]VersionBuildStatus, 0) numEstimatedActivatedGeneratedTasks := 0 + allTasks := task.Tasks{} variantsProcessed := map[string]bool{} for _, b := range existingBuilds { @@ -1588,12 +1575,10 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu continue } + allTasks = append(allTasks, tasks...) if err = build.Insert(); err != nil { return nil, errors.Wrapf(err, "inserting build '%s'", build.Id) } - if err = tasks.InsertUnordered(ctx); err != nil { - return nil, errors.Wrapf(err, "inserting tasks for build '%s'", build.Id) - } newBuildIds = append(newBuildIds, build.Id) batchTimeTasksToIds := map[string]string{} @@ -1639,6 +1624,10 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu }, ) } + SetNumDependents(allTasks) + if err = allTasks.InsertUnordered(ctx); err != nil { + return nil, errors.Wrap(err, "inserting tasks") + } numTasksModified := numEstimatedActivatedGeneratedTasks + len(newActivatedTaskIds) if err = task.UpdateSchedulingLimit(creationInfo.Version.Author, creationInfo.Version.Requester, numTasksModified, true); err != nil { return nil, errors.Wrapf(err, "fetching user '%s' and updating their scheduling limit", creationInfo.Version.Author) @@ -1694,6 +1683,7 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI activatedTaskIds := []string{} activatedTasks := []task.Task{} + allTasks := task.Tasks{} var buildIdsToActivate []string for _, b := range existingBuilds { wasActivated := b.Activated @@ -1746,6 +1736,7 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI return nil, err } + allTasks = append(allTasks, tasks...) for _, t := range tasks { if t.Activated { activatedTaskIds = append(activatedTaskIds, t.Id) @@ -1761,6 +1752,16 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI buildIdsToActivate = append(buildIdsToActivate, b.Id) } } + if err = allTasks.InsertUnordered(ctx); err != nil { + return nil, errors.Wrap(err, "inserting tasks") + } + SetNumDependents(allTasks) + // update each build to hold the new tasks + for _, b := range existingBuilds { + if err = RefreshTasksCache(creationInfo.Build.Id); err != nil { + return nil, errors.Wrapf(err, "updating task cache for '%s'", b.Id) + } + } if err = task.CheckUsersPatchTaskLimit(creationInfo.Version.Requester, creationInfo.Version.Author, false, activatedTasks...); err != nil { return nil, errors.Wrap(err, "updating patch task limit for user") } diff --git a/model/lifecycle_test.go b/model/lifecycle_test.go index c7daf8afe10..5ca7281e473 100644 --- a/model/lifecycle_test.go +++ b/model/lifecycle_test.go @@ -1940,8 +1940,8 @@ func TestDeletingBuild(t *testing.T) { }) } -func TestSetNumDeps(t *testing.T) { - Convey("setNumDeps correctly sets NumDependents for each task", t, func() { +func TestSetNumDependents(t *testing.T) { + Convey("SetNumDependents correctly sets NumDependents for each task", t, func() { tasks := []*task.Task{ {Id: "task1"}, { @@ -1957,7 +1957,7 @@ func TestSetNumDeps(t *testing.T) { DependsOn: []task.Dependency{{TaskId: "task2"}, {TaskId: "task3"}, {TaskId: "not_here"}}, }, } - setNumDeps(tasks) + SetNumDependents(tasks) So(len(tasks), ShouldEqual, 4) So(tasks[0].NumDependents, ShouldEqual, 3) So(tasks[1].NumDependents, ShouldEqual, 1) diff --git a/model/patch_lifecycle.go b/model/patch_lifecycle.go index 846daa7f09a..47fa7dd21d4 100644 --- a/model/patch_lifecycle.go +++ b/model/patch_lifecycle.go @@ -738,6 +738,7 @@ func FinalizePatch(ctx context.Context, p *patch.Patch, requester string) (*Vers }, ) } + SetNumDependents(tasksToInsert) numActivatedTasks := 0 for _, t := range tasksToInsert { if t.Activated { diff --git a/model/task/db.go b/model/task/db.go index 47e46f9bd62..395680d0428 100644 --- a/model/task/db.go +++ b/model/task/db.go @@ -63,7 +63,7 @@ var ( DependsOnKey = bsonutil.MustHaveTag(Task{}, "DependsOn") UnattainableDependencyKey = bsonutil.MustHaveTag(Task{}, "UnattainableDependency") OverrideDependenciesKey = bsonutil.MustHaveTag(Task{}, "OverrideDependencies") - NumDepsKey = bsonutil.MustHaveTag(Task{}, "NumDependents") + NumDependentsKey = bsonutil.MustHaveTag(Task{}, "NumDependents") DisplayNameKey = bsonutil.MustHaveTag(Task{}, "DisplayName") ExecutionPlatformKey = bsonutil.MustHaveTag(Task{}, "ExecutionPlatform") HostIdKey = bsonutil.MustHaveTag(Task{}, "HostId") diff --git a/model/task/task.go b/model/task/task.go index 4994e8b889d..1fe31c7d3ee 100644 --- a/model/task/task.go +++ b/model/task/task.go @@ -3978,12 +3978,12 @@ func (t *Task) SetCheckRunId(checkRunId int64) error { func (t *Task) SetNumDependents() error { update := bson.M{ "$set": bson.M{ - NumDepsKey: t.NumDependents, + NumDependentsKey: t.NumDependents, }, } if t.NumDependents == 0 { update = bson.M{"$unset": bson.M{ - NumDepsKey: "", + NumDependentsKey: "", }} } return UpdateOne(bson.M{ diff --git a/repotracker/repotracker.go b/repotracker/repotracker.go index 11fa06af13b..cc4d9adf985 100644 --- a/repotracker/repotracker.go +++ b/repotracker/repotracker.go @@ -980,6 +980,8 @@ func createVersionItems(ctx context.Context, v *model.Version, metadata model.Ve }) } + model.SetNumDependents(tasksToCreate) + grip.ErrorWhen(len(buildsToCreate) == 0, message.Fields{ "message": "version has no builds", "version": v.Id, diff --git a/repotracker/repotracker_test.go b/repotracker/repotracker_test.go index 538eae9267f..7c5371ca0b5 100644 --- a/repotracker/repotracker_test.go +++ b/repotracker/repotracker_test.go @@ -274,6 +274,113 @@ func TestStoreRepositoryRevisions(t *testing.T) { }) } +func TestCountNumDependentsAcrossVariants(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + assert.NoError(t, db.ClearCollections(model.VersionCollection, distro.Collection, model.ParserProjectCollection, + build.Collection, task.Collection, model.ProjectConfigCollection, model.ProjectRefCollection)) + + simpleYml := ` +buildvariants: +- name: bv1 + display_name: "bv_display" + run_on: d1 + tasks: + - name: t1 + - name: t2 + - name: t3 + - name: t4 +- name: bv2 + display_name: bv2_display + run_on: d2 + tasks: + - name: t1 +- name: bv3 + display_name: bv3_display + run_on: d2 + tasks: + - name: t4 + depends_on: + - name: t1 + variant: bv1 +tasks: +- name: t1 +- name: t2 + depends_on: + - name: t1 +- name: t3 + depends_on: + - name: t1 +- name: t4 + depends_on: + - name: t2 + - name: t3 +` + previouslyActivatedVersion := &model.Version{ + Id: "previously activated", + Identifier: "testproject", + Requester: evergreen.RepotrackerVersionRequester, + BuildVariants: []model.VersionBuildStatus{ + { + BuildVariant: "bv1", + BatchTimeTasks: []model.BatchTimeTaskStatus{ + { + TaskName: "t1", + ActivationStatus: model.ActivationStatus{ + Activated: true, + ActivateAt: time.Now().Add(-11 * time.Minute), + }, + }, + }, + ActivationStatus: model.ActivationStatus{ + Activated: true, + ActivateAt: time.Now().Add(-11 * time.Minute), + }, + }, + { + BuildVariant: "bv2", + ActivationStatus: model.ActivationStatus{ + Activated: true, + ActivateAt: time.Now().Add(-11 * time.Minute), + }, + }, + }, + } + assert.NoError(t, previouslyActivatedVersion.Insert()) + + // insert distros used in testing. + d := distro.Distro{Id: "d1"} + assert.NoError(t, d.Insert(ctx)) + d.Id = "d2" + assert.NoError(t, d.Insert(ctx)) + + pRef := &model.ProjectRef{ + Id: "testproject", + BatchTime: 0, + } + require.NoError(t, pRef.Insert()) + + p := &model.Project{} + pp, err := model.LoadProjectInto(ctx, []byte(simpleYml), nil, "testproject", p) + assert.NoError(t, err) + + // create new version to use for activating + revisions := []model.Revision{ + *createTestRevision("yes", time.Now()), + } + repoTracker := RepoTracker{ + testConfig, + pRef, + NewMockRepoPoller(pp, revisions), + } + assert.NoError(t, repoTracker.StoreRevisions(ctx, revisions)) + + bv1t1, err := task.FindOne(db.Query(bson.M{task.BuildVariantKey: "bv1", task.DisplayNameKey: "t1"})) + require.NoError(t, err) + require.NotNil(t, bv1t1) + assert.Equal(t, bv1t1.NumDependents, 4) +} + func TestBatchTimeForTasks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 7ac003c9a442da6201114da0d94db0482b92af7c Mon Sep 17 00:00:00 2001 From: "malik.hadjri" Date: Tue, 17 Dec 2024 17:22:39 -0500 Subject: [PATCH 2/4] Fix tests --- model/lifecycle.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/model/lifecycle.go b/model/lifecycle.go index ccade61b11a..efa4343602c 100644 --- a/model/lifecycle.go +++ b/model/lifecycle.go @@ -1029,8 +1029,12 @@ func RecomputeNumDependents(ctx context.Context, t task.Task) error { for i := range depTasks { taskPtrs = append(taskPtrs, &depTasks[i]) } - - versionTasks, err := task.FindAll(db.Query(task.ByVersion(t.Version))) + query := task.ByVersion(t.Version) + _, err = task.UpdateAll(query, bson.M{"$set": bson.M{task.NumDependentsKey: 0}}) + if err != nil { + return errors.Wrap(err, "resetting num dependents") + } + versionTasks, err := task.FindAll(db.Query(query)) if err != nil { return errors.Wrap(err, "getting tasks in version") } @@ -1758,7 +1762,7 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI SetNumDependents(allTasks) // update each build to hold the new tasks for _, b := range existingBuilds { - if err = RefreshTasksCache(creationInfo.Build.Id); err != nil { + if err = RefreshTasksCache(b.Id); err != nil { return nil, errors.Wrapf(err, "updating task cache for '%s'", b.Id) } } From 751af424f59850ddf06a3418c83aaa7e5e6bb802 Mon Sep 17 00:00:00 2001 From: "malik.hadjri" Date: Wed, 18 Dec 2024 16:14:56 -0500 Subject: [PATCH 3/4] Update comment --- model/lifecycle.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/model/lifecycle.go b/model/lifecycle.go index efa4343602c..a1276d675ea 100644 --- a/model/lifecycle.go +++ b/model/lifecycle.go @@ -963,8 +963,7 @@ func shouldSyncTask(syncVariantsTasks []patch.VariantTasks, bv, task string) boo } // SetNumDependents sets NumDependents for each task in tasks. -// NumDependents is the number of tasks depending on the task. Only tasks created at the same time -// and in the same variant are included. +// NumDependents is the number of tasks depending on the task. func SetNumDependents(tasks []*task.Task) { idToTask := make(map[string]*task.Task) for i, task := range tasks { From 0418eda552b051b4914ec87a6a783c79127badc2 Mon Sep 17 00:00:00 2001 From: "malik.hadjri" Date: Thu, 19 Dec 2024 13:30:51 -0500 Subject: [PATCH 4/4] CR --- model/lifecycle.go | 2 +- model/patch_lifecycle.go | 1 + repotracker/repotracker.go | 2 +- repotracker/repotracker_test.go | 19 +++++++------------ 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/model/lifecycle.go b/model/lifecycle.go index a1276d675ea..9f08c6891a9 100644 --- a/model/lifecycle.go +++ b/model/lifecycle.go @@ -1755,10 +1755,10 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI buildIdsToActivate = append(buildIdsToActivate, b.Id) } } + SetNumDependents(allTasks) if err = allTasks.InsertUnordered(ctx); err != nil { return nil, errors.Wrap(err, "inserting tasks") } - SetNumDependents(allTasks) // update each build to hold the new tasks for _, b := range existingBuilds { if err = RefreshTasksCache(b.Id); err != nil { diff --git a/model/patch_lifecycle.go b/model/patch_lifecycle.go index 47fa7dd21d4..1a15eef1663 100644 --- a/model/patch_lifecycle.go +++ b/model/patch_lifecycle.go @@ -738,6 +738,7 @@ func FinalizePatch(ctx context.Context, p *patch.Patch, requester string) (*Vers }, ) } + // We must set the NumDependents field for tasks prior to inserting them in the DB. SetNumDependents(tasksToInsert) numActivatedTasks := 0 for _, t := range tasksToInsert { diff --git a/repotracker/repotracker.go b/repotracker/repotracker.go index cc4d9adf985..dd3c13a63c6 100644 --- a/repotracker/repotracker.go +++ b/repotracker/repotracker.go @@ -979,7 +979,7 @@ func createVersionItems(ctx context.Context, v *model.Version, metadata model.Ve }, }) } - + // We must set the NumDependents field for tasks prior to inserting them in the DB. model.SetNumDependents(tasksToCreate) grip.ErrorWhen(len(buildsToCreate) == 0, message.Fields{ diff --git a/repotracker/repotracker_test.go b/repotracker/repotracker_test.go index 7c5371ca0b5..980799b1fb7 100644 --- a/repotracker/repotracker_test.go +++ b/repotracker/repotracker_test.go @@ -84,7 +84,6 @@ func TestStoreRepositoryRevisions(t *testing.T) { So(err, ShouldBeNil) repoTracker := RepoTracker{testConfig, evgProjectRef, NewGithubRepositoryPoller(evgProjectRef)} - // insert distros used in testing. d := distro.Distro{Id: "test-distro-one"} So(d.Insert(ctx), ShouldBeNil) d.Id = "test-distro-two" @@ -191,7 +190,6 @@ func TestStoreRepositoryRevisions(t *testing.T) { poller, } - // insert distros used in testing. d := distro.Distro{Id: "test-distro-one"} So(d.Insert(ctx), ShouldBeNil) d.Id = "test-distro-two" @@ -277,7 +275,7 @@ func TestStoreRepositoryRevisions(t *testing.T) { func TestCountNumDependentsAcrossVariants(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - assert.NoError(t, db.ClearCollections(model.VersionCollection, distro.Collection, model.ParserProjectCollection, + require.NoError(t, db.ClearCollections(model.VersionCollection, distro.Collection, model.ParserProjectCollection, build.Collection, task.Collection, model.ProjectConfigCollection, model.ProjectRefCollection)) simpleYml := ` @@ -346,13 +344,12 @@ tasks: }, }, } - assert.NoError(t, previouslyActivatedVersion.Insert()) + require.NoError(t, previouslyActivatedVersion.Insert()) - // insert distros used in testing. d := distro.Distro{Id: "d1"} - assert.NoError(t, d.Insert(ctx)) + require.NoError(t, d.Insert(ctx)) d.Id = "d2" - assert.NoError(t, d.Insert(ctx)) + require.NoError(t, d.Insert(ctx)) pRef := &model.ProjectRef{ Id: "testproject", @@ -363,8 +360,9 @@ tasks: p := &model.Project{} pp, err := model.LoadProjectInto(ctx, []byte(simpleYml), nil, "testproject", p) assert.NoError(t, err) + require.NotNil(t, pp) - // create new version to use for activating + // Create new version to use for activating revisions := []model.Revision{ *createTestRevision("yes", time.Now()), } @@ -452,7 +450,6 @@ tasks: } assert.NoError(t, previouslyActivatedVersion.Insert()) - // insert distros used in testing. d := distro.Distro{Id: "d1"} assert.NoError(t, d.Insert(ctx)) d.Id = "d2" @@ -468,7 +465,7 @@ tasks: pp, err := model.LoadProjectInto(ctx, []byte(simpleYml), nil, "testproject", p) assert.NoError(t, err) - // create new version to use for activating + // Create new version to use for activating revisions := []model.Revision{ *createTestRevision("yes", time.Now()), } @@ -586,7 +583,6 @@ func TestBatchTimes(t *testing.T) { So(previouslyActivatedVersion.Insert(), ShouldBeNil) - // insert distros used in testing. d := distro.Distro{Id: "test-distro-one"} So(d.Insert(ctx), ShouldBeNil) d.Id = "test-distro-two" @@ -752,7 +748,6 @@ func TestBatchTimes(t *testing.T) { Requester: evergreen.RepotrackerVersionRequester, } So(previouslyActivatedVersion.Insert(), ShouldBeNil) - // insert distros used in testing. d := distro.Distro{Id: "test-distro-one"} So(d.Insert(ctx), ShouldBeNil) d.Id = "test-distro-two"