Skip to content

Commit

Permalink
delete dependson task wait.PollInfinite
Browse files Browse the repository at this point in the history
Signed-off-by: Ren <[email protected]>
  • Loading branch information
Along-Ren committed Jun 7, 2023
1 parent 300cc2c commit a49af3a
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 33 deletions.
6 changes: 1 addition & 5 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultDetectionPeriodOfDependsOntask = 100 * time.Millisecond
defaultRetryMaxNumOfDependsOntask = 1200 // 2min
defaultLockObjectNamespace = "volcano-system"
)

Expand All @@ -60,9 +61,6 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11252
HealthzBindAddress string
EnableHealthz bool
// For dependent tasks, there is a detection cycle inside volcano
// It indicates how often to detect the status of dependent tasks
DetectionPeriodOfDependsOntask time.Duration
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
}
Expand Down Expand Up @@ -94,8 +92,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.DurationVar(&s.DetectionPeriodOfDependsOntask, "detection-period-of-dependson-task", defaultDetectionPeriodOfDependsOntask, "It indicates how often to detect the status of dependent tasks."+
"e.g. --detection-period-of-dependson-task=1s")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
}

Expand Down
3 changes: 0 additions & 3 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/cmd/controller-manager/app/options"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/kube"
)

Expand All @@ -61,8 +60,6 @@ func Run(opt *options.ServerOption) error {
}
}

job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask)

run := startControllers(config, opt)

if !opt.EnableLeaderElection {
Expand Down
44 changes: 26 additions & 18 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package job

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -28,7 +29,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -378,7 +378,17 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
go func(taskName string, podToCreateEachTask []*v1.Pod) {
taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
if job.Spec.Tasks[taskIndex].DependsOn != nil {
cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job)
if !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) {
klog.Errorf("Job %s/%s depends on task not ready", job.Name, job.Namespace)
creationErrs = append(creationErrs, errors.New(fmt.Sprintf("Job %s/%s depends on task not ready", job.Name, job.Namespace)))
// release wait group
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
}(pod)
}
return
}
}

for _, pod := range podToCreateEachTask {
Expand Down Expand Up @@ -477,31 +487,29 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
return nil
}

func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) {
func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) bool {
if job.Spec.Tasks[taskIndex].DependsOn == nil {
return
return true
}

dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true, nil
}
// any ready to create task
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true
}
return false, nil
})
}
return false
} else {
for _, dependsOnTask := range dependsOn.Name {
wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
if cc.isDependsOnPodsReady(dependsOnTask, job) {
return true, nil
}
return false, nil
})
// any not ready to skip
if !cc.isDependsOnPodsReady(dependsOnTask, job) {
return false
}
return true
}
}
return true
}

func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool {
Expand Down
108 changes: 105 additions & 3 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"context"
"errors"
"fmt"
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"testing"

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
schedulingapi "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -261,6 +260,109 @@ func TestSyncJobFunc(t *testing.T) {
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
{
Name: "SyncJob with dependsOn job can't find the dependent task",
/*
Work dependsOn Master task, preempt actions causes controller deadlock
controller master,work scheduler
| | <---preempt----- |
| | <---kill work--- |
| ----watch work kill----> | |
| | <---kill master-- |
| ----create work pods---> | |
| --wait master running--> | |
| | |
| ---watch master kill---> | |
| --push master to queue-> | |
| -wait process to create-> | |
*/
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
ResourceVersion: "100",
UID: "e7f18111-1cec-11ea-b688-fa163ec79500",
},
Spec: v1alpha1.JobSpec{
Tasks: []v1alpha1.TaskSpec{
{
Name: "master",
Replicas: 1,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "work",
Replicas: 3,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
DependsOn: &v1alpha1.DependsOn{
Name: []string{"master"},
},
},
},
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingapi.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "job1-e7f18111-1cec-11ea-b688-fa163ec79500",
Namespace: namespace,
},
Spec: schedulingapi.PodGroupSpec{
MinResources: &v1.ResourceList{},
MinTaskMember: map[string]int32{},
},
Status: schedulingapi.PodGroupStatus{
Phase: schedulingapi.PodGroupInqueue,
},
},
PodRetainPhase: state.PodRetainPhaseNone,
UpdateStatus: nil,
JobInfo: &apis.JobInfo{
Namespace: namespace,
Name: "jobinfo1",
Pods: map[string]map[string]*v1.Pod{
"work": {
"job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil),
"job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil),
},
},
},
Pods: map[string]*v1.Pod{
"job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil),
"job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil),
},
TotalNumPods: 4,
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
}
for i, testcase := range testcases {

Expand Down
5 changes: 1 addition & 4 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
)

var detectionPeriodOfDependsOntask time.Duration
var retryMaxNumOfDependsOntask int

// MakePodName append podname,jobname,taskName and index and returns the string.
func MakePodName(jobName string, taskName string, index int) string {
Expand Down Expand Up @@ -258,7 +259,3 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
}
return false
}

func SetDetectionPeriodOfDependsOntask(period time.Duration) {
detectionPeriodOfDependsOntask = period
}

0 comments on commit a49af3a

Please sign in to comment.