Skip to content

Commit

Permalink
DEVPROD-10103 Account for cross-build dependents in NumDependents (#8560
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hadjri authored Dec 20, 2024
1 parent 707d4e2 commit b462ffd
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 40 deletions.
60 changes: 32 additions & 28 deletions model/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,6 @@ func addTasksToBuild(ctx context.Context, creationInfo TaskCreationInfo) (*build
return nil, nil, errors.Wrapf(err, "creating tasks for build '%s'", creationInfo.Build.Id)
}

if err = tasks.InsertUnordered(ctx); err != nil {
return nil, nil, errors.Wrapf(err, "inserting tasks for build '%s'", creationInfo.Build.Id)
}

var hasGitHubCheck bool
var hasUnfinishedEssentialTask bool
for _, t := range tasks {
Expand All @@ -525,11 +521,6 @@ func addTasksToBuild(ctx context.Context, creationInfo TaskCreationInfo) (*build
return nil, nil, errors.Wrapf(err, "setting build '%s' as having an unfinished essential task", creationInfo.Build.Id)
}

// update the build to hold the new tasks
if err = RefreshTasksCache(creationInfo.Build.Id); err != nil {
return nil, nil, errors.Wrapf(err, "updating task cache for '%s'", creationInfo.Build.Id)
}

batchTimeTaskStatuses := []BatchTimeTaskStatus{}
tasksWithActivationTime := creationInfo.ActivationInfo.getActivationTasks(creationInfo.Build.BuildVariant)
batchTimeCatcher := grip.NewBasicCatcher()
Expand Down Expand Up @@ -862,10 +853,6 @@ func createTasksForBuild(ctx context.Context, creationInfo TaskCreationInfo) (ta
tasks = append(tasks, t)
}

// Set the NumDependents field
// Existing tasks in the db and tasks in other builds are not updated
setNumDeps(tasks)

sort.Stable(tasks)

// return all of the tasks created
Expand Down Expand Up @@ -975,28 +962,26 @@ func shouldSyncTask(syncVariantsTasks []patch.VariantTasks, bv, task string) boo
return false
}

// setNumDeps sets NumDependents for each task in tasks.
// NumDependents is the number of tasks depending on the task. Only tasks created at the same time
// and in the same variant are included.
func setNumDeps(tasks []*task.Task) {
// SetNumDependents sets NumDependents for each task in tasks.
// NumDependents is the number of tasks depending on the task.
func SetNumDependents(tasks []*task.Task) {
idToTask := make(map[string]*task.Task)
for i, task := range tasks {
idToTask[task.Id] = tasks[i]
}
deduplicatedTasks := []*task.Task{}
for _, task := range idToTask {
task.NumDependents = 0
deduplicatedTasks = append(deduplicatedTasks, task)
}
for _, task := range deduplicatedTasks {
// Recursively find all tasks that task depends on and increments their NumDependents field
setNumDepsRec(task, idToTask, make(map[string]bool))
setNumDependentsRec(task, idToTask, make(map[string]bool))
}
}

// setNumDepsRec recursively finds all tasks that task depends on and increments their NumDependents field.
// setNumDependentsRec recursively finds all tasks that task depends on and increments their NumDependents field.
// tasks not in idToTasks are not affected.
func setNumDepsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[string]bool) {
func setNumDependentsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[string]bool) {
for _, dep := range t.DependsOn {
// Check whether this dependency is included in the tasks we're currently creating
depTask, ok := idToTasks[dep.TaskId]
Expand All @@ -1008,7 +993,7 @@ func setNumDepsRec(t *task.Task, idToTasks map[string]*task.Task, seen map[strin
if !seen[depTask.Id] {
seen[depTask.Id] = true
depTask.NumDependents = depTask.NumDependents + 1
setNumDepsRec(depTask, idToTasks, seen)
setNumDependentsRec(depTask, idToTasks, seen)
}
}
}
Expand Down Expand Up @@ -1043,16 +1028,20 @@ func RecomputeNumDependents(ctx context.Context, t task.Task) error {
for i := range depTasks {
taskPtrs = append(taskPtrs, &depTasks[i])
}

versionTasks, err := task.FindAll(db.Query(task.ByVersion(t.Version)))
query := task.ByVersion(t.Version)
_, err = task.UpdateAll(query, bson.M{"$set": bson.M{task.NumDependentsKey: 0}})
if err != nil {
return errors.Wrap(err, "resetting num dependents")
}
versionTasks, err := task.FindAll(db.Query(query))
if err != nil {
return errors.Wrap(err, "getting tasks in version")
}
for i := range versionTasks {
taskPtrs = append(taskPtrs, &versionTasks[i])
}

setNumDeps(taskPtrs)
SetNumDependents(taskPtrs)
catcher := grip.NewBasicCatcher()
for _, t := range taskPtrs {
catcher.Add(t.SetNumDependents())
Expand Down Expand Up @@ -1532,6 +1521,7 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu
newActivatedTasks := []task.Task{}
newBuildStatuses := make([]VersionBuildStatus, 0)
numEstimatedActivatedGeneratedTasks := 0
allTasks := task.Tasks{}

variantsProcessed := map[string]bool{}
for _, b := range existingBuilds {
Expand Down Expand Up @@ -1588,12 +1578,10 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu
continue
}

allTasks = append(allTasks, tasks...)
if err = build.Insert(); err != nil {
return nil, errors.Wrapf(err, "inserting build '%s'", build.Id)
}
if err = tasks.InsertUnordered(ctx); err != nil {
return nil, errors.Wrapf(err, "inserting tasks for build '%s'", build.Id)
}
newBuildIds = append(newBuildIds, build.Id)

batchTimeTasksToIds := map[string]string{}
Expand Down Expand Up @@ -1639,6 +1627,10 @@ func addNewBuilds(ctx context.Context, creationInfo TaskCreationInfo, existingBu
},
)
}
SetNumDependents(allTasks)
if err = allTasks.InsertUnordered(ctx); err != nil {
return nil, errors.Wrap(err, "inserting tasks")
}
numTasksModified := numEstimatedActivatedGeneratedTasks + len(newActivatedTaskIds)
if err = task.UpdateSchedulingLimit(creationInfo.Version.Author, creationInfo.Version.Requester, numTasksModified, true); err != nil {
return nil, errors.Wrapf(err, "fetching user '%s' and updating their scheduling limit", creationInfo.Version.Author)
Expand Down Expand Up @@ -1694,6 +1686,7 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI

activatedTaskIds := []string{}
activatedTasks := []task.Task{}
allTasks := task.Tasks{}
var buildIdsToActivate []string
for _, b := range existingBuilds {
wasActivated := b.Activated
Expand Down Expand Up @@ -1746,6 +1739,7 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI
return nil, err
}

allTasks = append(allTasks, tasks...)
for _, t := range tasks {
if t.Activated {
activatedTaskIds = append(activatedTaskIds, t.Id)
Expand All @@ -1761,6 +1755,16 @@ func addNewTasksToExistingBuilds(ctx context.Context, creationInfo TaskCreationI
buildIdsToActivate = append(buildIdsToActivate, b.Id)
}
}
SetNumDependents(allTasks)
if err = allTasks.InsertUnordered(ctx); err != nil {
return nil, errors.Wrap(err, "inserting tasks")
}
// update each build to hold the new tasks
for _, b := range existingBuilds {
if err = RefreshTasksCache(b.Id); err != nil {
return nil, errors.Wrapf(err, "updating task cache for '%s'", b.Id)
}
}
if err = task.CheckUsersPatchTaskLimit(creationInfo.Version.Requester, creationInfo.Version.Author, false, activatedTasks...); err != nil {
return nil, errors.Wrap(err, "updating patch task limit for user")
}
Expand Down
6 changes: 3 additions & 3 deletions model/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,8 +1940,8 @@ func TestDeletingBuild(t *testing.T) {
})
}

func TestSetNumDeps(t *testing.T) {
Convey("setNumDeps correctly sets NumDependents for each task", t, func() {
func TestSetNumDependents(t *testing.T) {
Convey("SetNumDependents correctly sets NumDependents for each task", t, func() {
tasks := []*task.Task{
{Id: "task1"},
{
Expand All @@ -1957,7 +1957,7 @@ func TestSetNumDeps(t *testing.T) {
DependsOn: []task.Dependency{{TaskId: "task2"}, {TaskId: "task3"}, {TaskId: "not_here"}},
},
}
setNumDeps(tasks)
SetNumDependents(tasks)
So(len(tasks), ShouldEqual, 4)
So(tasks[0].NumDependents, ShouldEqual, 3)
So(tasks[1].NumDependents, ShouldEqual, 1)
Expand Down
2 changes: 2 additions & 0 deletions model/patch_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,8 @@ func FinalizePatch(ctx context.Context, p *patch.Patch, requester string) (*Vers
},
)
}
// We must set the NumDependents field for tasks prior to inserting them in the DB.
SetNumDependents(tasksToInsert)
numActivatedTasks := 0
for _, t := range tasksToInsert {
if t.Activated {
Expand Down
2 changes: 1 addition & 1 deletion model/task/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (
DependsOnKey = bsonutil.MustHaveTag(Task{}, "DependsOn")
UnattainableDependencyKey = bsonutil.MustHaveTag(Task{}, "UnattainableDependency")
OverrideDependenciesKey = bsonutil.MustHaveTag(Task{}, "OverrideDependencies")
NumDepsKey = bsonutil.MustHaveTag(Task{}, "NumDependents")
NumDependentsKey = bsonutil.MustHaveTag(Task{}, "NumDependents")
DisplayNameKey = bsonutil.MustHaveTag(Task{}, "DisplayName")
ExecutionPlatformKey = bsonutil.MustHaveTag(Task{}, "ExecutionPlatform")
HostIdKey = bsonutil.MustHaveTag(Task{}, "HostId")
Expand Down
4 changes: 2 additions & 2 deletions model/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3982,12 +3982,12 @@ func (t *Task) SetCheckRunId(checkRunId int64) error {
func (t *Task) SetNumDependents() error {
update := bson.M{
"$set": bson.M{
NumDepsKey: t.NumDependents,
NumDependentsKey: t.NumDependents,
},
}
if t.NumDependents == 0 {
update = bson.M{"$unset": bson.M{
NumDepsKey: "",
NumDependentsKey: "",
}}
}
return UpdateOne(bson.M{
Expand Down
2 changes: 2 additions & 0 deletions repotracker/repotracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,8 @@ func createVersionItems(ctx context.Context, v *model.Version, metadata model.Ve
},
})
}
// We must set the NumDependents field for tasks prior to inserting them in the DB.
model.SetNumDependents(tasksToCreate)

grip.ErrorWhen(len(buildsToCreate) == 0, message.Fields{
"message": "version has no builds",
Expand Down
114 changes: 108 additions & 6 deletions repotracker/repotracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func TestStoreRepositoryRevisions(t *testing.T) {
So(err, ShouldBeNil)
repoTracker := RepoTracker{testConfig, evgProjectRef, NewGithubRepositoryPoller(evgProjectRef)}

// insert distros used in testing.
d := distro.Distro{Id: "test-distro-one"}
So(d.Insert(ctx), ShouldBeNil)
d.Id = "test-distro-two"
Expand Down Expand Up @@ -191,7 +190,6 @@ func TestStoreRepositoryRevisions(t *testing.T) {
poller,
}

// insert distros used in testing.
d := distro.Distro{Id: "test-distro-one"}
So(d.Insert(ctx), ShouldBeNil)
d.Id = "test-distro-two"
Expand Down Expand Up @@ -274,6 +272,113 @@ func TestStoreRepositoryRevisions(t *testing.T) {
})
}

func TestCountNumDependentsAcrossVariants(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, db.ClearCollections(model.VersionCollection, distro.Collection, model.ParserProjectCollection,
build.Collection, task.Collection, model.ProjectConfigCollection, model.ProjectRefCollection))

simpleYml := `
buildvariants:
- name: bv1
display_name: "bv_display"
run_on: d1
tasks:
- name: t1
- name: t2
- name: t3
- name: t4
- name: bv2
display_name: bv2_display
run_on: d2
tasks:
- name: t1
- name: bv3
display_name: bv3_display
run_on: d2
tasks:
- name: t4
depends_on:
- name: t1
variant: bv1
tasks:
- name: t1
- name: t2
depends_on:
- name: t1
- name: t3
depends_on:
- name: t1
- name: t4
depends_on:
- name: t2
- name: t3
`
previouslyActivatedVersion := &model.Version{
Id: "previously activated",
Identifier: "testproject",
Requester: evergreen.RepotrackerVersionRequester,
BuildVariants: []model.VersionBuildStatus{
{
BuildVariant: "bv1",
BatchTimeTasks: []model.BatchTimeTaskStatus{
{
TaskName: "t1",
ActivationStatus: model.ActivationStatus{
Activated: true,
ActivateAt: time.Now().Add(-11 * time.Minute),
},
},
},
ActivationStatus: model.ActivationStatus{
Activated: true,
ActivateAt: time.Now().Add(-11 * time.Minute),
},
},
{
BuildVariant: "bv2",
ActivationStatus: model.ActivationStatus{
Activated: true,
ActivateAt: time.Now().Add(-11 * time.Minute),
},
},
},
}
require.NoError(t, previouslyActivatedVersion.Insert())

d := distro.Distro{Id: "d1"}
require.NoError(t, d.Insert(ctx))
d.Id = "d2"
require.NoError(t, d.Insert(ctx))

pRef := &model.ProjectRef{
Id: "testproject",
BatchTime: 0,
}
require.NoError(t, pRef.Insert())

p := &model.Project{}
pp, err := model.LoadProjectInto(ctx, []byte(simpleYml), nil, "testproject", p)
assert.NoError(t, err)
require.NotNil(t, pp)

// Create new version to use for activating
revisions := []model.Revision{
*createTestRevision("yes", time.Now()),
}
repoTracker := RepoTracker{
testConfig,
pRef,
NewMockRepoPoller(pp, revisions),
}
assert.NoError(t, repoTracker.StoreRevisions(ctx, revisions))

bv1t1, err := task.FindOne(db.Query(bson.M{task.BuildVariantKey: "bv1", task.DisplayNameKey: "t1"}))
require.NoError(t, err)
require.NotNil(t, bv1t1)
assert.Equal(t, bv1t1.NumDependents, 4)
}

func TestBatchTimeForTasks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -345,7 +450,6 @@ tasks:
}
assert.NoError(t, previouslyActivatedVersion.Insert())

// insert distros used in testing.
d := distro.Distro{Id: "d1"}
assert.NoError(t, d.Insert(ctx))
d.Id = "d2"
Expand All @@ -361,7 +465,7 @@ tasks:
pp, err := model.LoadProjectInto(ctx, []byte(simpleYml), nil, "testproject", p)
assert.NoError(t, err)

// create new version to use for activating
// Create new version to use for activating
revisions := []model.Revision{
*createTestRevision("yes", time.Now()),
}
Expand Down Expand Up @@ -479,7 +583,6 @@ func TestBatchTimes(t *testing.T) {

So(previouslyActivatedVersion.Insert(), ShouldBeNil)

// insert distros used in testing.
d := distro.Distro{Id: "test-distro-one"}
So(d.Insert(ctx), ShouldBeNil)
d.Id = "test-distro-two"
Expand Down Expand Up @@ -645,7 +748,6 @@ func TestBatchTimes(t *testing.T) {
Requester: evergreen.RepotrackerVersionRequester,
}
So(previouslyActivatedVersion.Insert(), ShouldBeNil)
// insert distros used in testing.
d := distro.Distro{Id: "test-distro-one"}
So(d.Insert(ctx), ShouldBeNil)
d.Id = "test-distro-two"
Expand Down

0 comments on commit b462ffd

Please sign in to comment.