From 1d7c42b144eb3a01a30cfe774b16a847627b2716 Mon Sep 17 00:00:00 2001 From: Ren Date: Wed, 7 Jun 2023 17:56:14 +0800 Subject: [PATCH 1/3] delete dependson task wait.PollInfinite Signed-off-by: Ren --- cmd/controller-manager/app/options/options.go | 24 ++-- cmd/controller-manager/app/server.go | 3 - pkg/controllers/job/job_controller_actions.go | 44 ++++--- .../job/job_controller_actions_test.go | 108 +++++++++++++++++- pkg/controllers/job/job_controller_util.go | 8 -- 5 files changed, 139 insertions(+), 48 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 31b59554d7..51caf49091 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -18,23 +18,20 @@ package options import ( "fmt" - "os" - "time" - "github.com/spf13/pflag" + "os" "volcano.sh/volcano/pkg/kube" ) const ( - defaultQPS = 50.0 - defaultBurst = 100 - defaultWorkers = 3 - defaultMaxRequeueNum = 15 - defaultSchedulerName = "volcano" - defaultHealthzAddress = ":11251" - defaultDetectionPeriodOfDependsOntask = 100 * time.Millisecond - defaultLockObjectNamespace = "volcano-system" + defaultQPS = 50.0 + defaultBurst = 100 + defaultWorkers = 3 + defaultMaxRequeueNum = 15 + defaultSchedulerName = "volcano" + defaultHealthzAddress = ":11251" + defaultLockObjectNamespace = "volcano-system" ) // ServerOption is the main context object for the controllers. @@ -60,9 +57,6 @@ type ServerOption struct { // defaulting to 0.0.0.0:11252 HealthzBindAddress string EnableHealthz bool - // For dependent tasks, there is a detection cycle inside volcano - // It indicates how often to detect the status of dependent tasks - DetectionPeriodOfDependsOntask time.Duration // To determine whether inherit owner's annotations for pods when create podgroup InheritOwnerAnnotations bool } @@ -94,8 +88,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue") fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") - fs.DurationVar(&s.DetectionPeriodOfDependsOntask, "detection-period-of-dependson-task", defaultDetectionPeriodOfDependsOntask, "It indicates how often to detect the status of dependent tasks."+ - "e.g. --detection-period-of-dependson-task=1s") fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default") } diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index eeca173fce..41b72da9ea 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -38,7 +38,6 @@ import ( vcclientset "volcano.sh/apis/pkg/client/clientset/versioned" "volcano.sh/volcano/cmd/controller-manager/app/options" "volcano.sh/volcano/pkg/controllers/framework" - "volcano.sh/volcano/pkg/controllers/job" "volcano.sh/volcano/pkg/kube" ) @@ -61,8 +60,6 @@ func Run(opt *options.ServerOption) error { } } - job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask) - run := startControllers(config, opt) if !opt.EnableLeaderElection { diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 9dad507b4e..a4afcbfa34 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -18,6 +18,7 @@ package job import ( "context" + "errors" "fmt" "reflect" "sort" @@ -28,7 +29,6 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/klog/v2" @@ -378,7 +378,17 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat go func(taskName string, podToCreateEachTask []*v1.Pod) { taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job) if job.Spec.Tasks[taskIndex].DependsOn != nil { - cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) + if !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) { + klog.Errorf("Job %s/%s depends on task not ready", job.Name, job.Namespace) + creationErrs = append(creationErrs, errors.New(fmt.Sprintf("Job %s/%s depends on task not ready", job.Name, job.Namespace))) + // release wait group + for _, pod := range podToCreateEachTask { + go func(pod *v1.Pod) { + defer waitCreationGroup.Done() + }(pod) + } + return + } } for _, pod := range podToCreateEachTask { @@ -477,31 +487,29 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat return nil } -func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) { +func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) bool { if job.Spec.Tasks[taskIndex].DependsOn == nil { - return + return true } - dependsOn := *job.Spec.Tasks[taskIndex].DependsOn if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny { - wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) { - for _, task := range dependsOn.Name { - if cc.isDependsOnPodsReady(task, job) { - return true, nil - } + // any ready to create task + for _, task := range dependsOn.Name { + if cc.isDependsOnPodsReady(task, job) { + return true } - return false, nil - }) + } + return false } else { for _, dependsOnTask := range dependsOn.Name { - wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) { - if cc.isDependsOnPodsReady(dependsOnTask, job) { - return true, nil - } - return false, nil - }) + // any not ready to skip + if !cc.isDependsOnPodsReady(dependsOnTask, job) { + return false + } + return true } } + return true } func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool { diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 687b965d32..a756020de9 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -20,12 +20,11 @@ import ( "context" "errors" "fmt" - "reflect" - "testing" - "github.com/agiledragon/gomonkey/v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" + "testing" "volcano.sh/apis/pkg/apis/batch/v1alpha1" schedulingapi "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -261,6 +260,109 @@ func TestSyncJobFunc(t *testing.T) { Plugins: []string{"svc", "ssh", "env"}, ExpectVal: nil, }, + { + Name: "SyncJob with dependsOn job can't find the dependent task", + /* + Work dependsOn Master task, preempt actions causes controller deadlock + controller master,work scheduler + | | <---preempt----- | + | | <---kill work--- | + | ----watch work kill----> | | + | | <---kill master-- | + | ----create work pods---> | | + | --wait master running--> | | + | | | + | ---watch master kill---> | | + | --push master to queue-> | | + | -wait process to create-> | | + */ + Job: &v1alpha1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: namespace, + ResourceVersion: "100", + UID: "e7f18111-1cec-11ea-b688-fa163ec79500", + }, + Spec: v1alpha1.JobSpec{ + Tasks: []v1alpha1.TaskSpec{ + { + Name: "master", + Replicas: 1, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pods", + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "Containers", + }, + }, + }, + }, + }, + { + Name: "work", + Replicas: 3, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pods", + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "Containers", + }, + }, + }, + }, + DependsOn: &v1alpha1.DependsOn{ + Name: []string{"master"}, + }, + }, + }, + }, + Status: v1alpha1.JobStatus{ + State: v1alpha1.JobState{ + Phase: v1alpha1.Pending, + }, + }, + }, + PodGroup: &schedulingapi.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1-e7f18111-1cec-11ea-b688-fa163ec79500", + Namespace: namespace, + }, + Spec: schedulingapi.PodGroupSpec{ + MinResources: &v1.ResourceList{}, + MinTaskMember: map[string]int32{}, + }, + Status: schedulingapi.PodGroupStatus{ + Phase: schedulingapi.PodGroupInqueue, + }, + }, + PodRetainPhase: state.PodRetainPhaseNone, + UpdateStatus: nil, + JobInfo: &apis.JobInfo{ + Namespace: namespace, + Name: "jobinfo1", + Pods: map[string]map[string]*v1.Pod{ + "work": { + "job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil), + "job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil), + }, + }, + }, + Pods: map[string]*v1.Pod{ + "job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil), + "job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil), + }, + TotalNumPods: 4, + Plugins: []string{"svc", "ssh", "env"}, + ExpectVal: nil, + }, } for i, testcase := range testcases { diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 6f93c706ac..882de6e0c0 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -18,8 +18,6 @@ package job import ( "fmt" - "time" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,8 +31,6 @@ import ( jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" ) -var detectionPeriodOfDependsOntask time.Duration - // MakePodName append podname,jobname,taskName and index and returns the string. func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index) @@ -258,7 +254,3 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool { } return false } - -func SetDetectionPeriodOfDependsOntask(period time.Duration) { - detectionPeriodOfDependsOntask = period -} From 7a9f1ba323a40b918e26267446fbb7acf237e8a5 Mon Sep 17 00:00:00 2001 From: Ren Date: Wed, 14 Jun 2023 15:19:00 +0800 Subject: [PATCH 2/3] delete error log Signed-off-by: Ren --- pkg/controllers/job/job_controller_actions.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index a4afcbfa34..f75a0ede70 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -18,7 +18,6 @@ package job import ( "context" - "errors" "fmt" "reflect" "sort" @@ -379,8 +378,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job) if job.Spec.Tasks[taskIndex].DependsOn != nil { if !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) { - klog.Errorf("Job %s/%s depends on task not ready", job.Name, job.Namespace) - creationErrs = append(creationErrs, errors.New(fmt.Sprintf("Job %s/%s depends on task not ready", job.Name, job.Namespace))) + klog.V(4).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace) // release wait group for _, pod := range podToCreateEachTask { go func(pod *v1.Pod) { From 6d0ab3b8d1a438919f062ddf4bbea05ced69bce9 Mon Sep 17 00:00:00 2001 From: Ren Date: Sun, 25 Jun 2023 14:27:30 +0800 Subject: [PATCH 3/3] change wait dependson job log level to error Signed-off-by: Ren --- cmd/controller-manager/app/options/options.go | 3 ++- .../app/options/options_test.go | 17 ++++++++--------- pkg/controllers/job/job_controller_actions.go | 18 +++++++++--------- pkg/controllers/job/job_controller_util.go | 1 + 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 51caf49091..45e084e979 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -18,9 +18,10 @@ package options import ( "fmt" - "github.com/spf13/pflag" "os" + "github.com/spf13/pflag" + "volcano.sh/volcano/pkg/kube" ) diff --git a/cmd/controller-manager/app/options/options_test.go b/cmd/controller-manager/app/options/options_test.go index e3a7f56b82..e728ba89d5 100644 --- a/cmd/controller-manager/app/options/options_test.go +++ b/cmd/controller-manager/app/options/options_test.go @@ -46,15 +46,14 @@ func TestAddFlags(t *testing.T) { QPS: defaultQPS, Burst: 200, }, - PrintVersion: false, - WorkerThreads: defaultWorkers, - SchedulerNames: []string{"volcano", "volcano2"}, - MaxRequeueNum: defaultMaxRequeueNum, - HealthzBindAddress: ":11251", - DetectionPeriodOfDependsOntask: defaultDetectionPeriodOfDependsOntask, - InheritOwnerAnnotations: true, - EnableLeaderElection: true, - LockObjectNamespace: defaultLockObjectNamespace, + PrintVersion: false, + WorkerThreads: defaultWorkers, + SchedulerNames: []string{"volcano", "volcano2"}, + MaxRequeueNum: defaultMaxRequeueNum, + HealthzBindAddress: ":11251", + InheritOwnerAnnotations: true, + EnableLeaderElection: true, + LockObjectNamespace: defaultLockObjectNamespace, } if !reflect.DeepEqual(expected, s) { diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index f75a0ede70..10dae61615 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -378,7 +378,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job) if job.Spec.Tasks[taskIndex].DependsOn != nil { if !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) { - klog.V(4).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace) + klog.V(3).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace) // release wait group for _, pod := range podToCreateEachTask { go func(pod *v1.Pod) { @@ -491,22 +491,22 @@ func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskInd } dependsOn := *job.Spec.Tasks[taskIndex].DependsOn if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny { - // any ready to create task + // any ready to create task, return true for _, task := range dependsOn.Name { if cc.isDependsOnPodsReady(task, job) { return true } } + // all not ready to skip create task, return false return false - } else { - for _, dependsOnTask := range dependsOn.Name { - // any not ready to skip - if !cc.isDependsOnPodsReady(dependsOnTask, job) { - return false - } - return true + } + for _, dependsOnTask := range dependsOn.Name { + // any not ready to skip create task, return false + if !cc.isDependsOnPodsReady(dependsOnTask, job) { + return false } } + // all ready to create task, return true return true } diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 882de6e0c0..d5ca852ed9 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -18,6 +18,7 @@ package job import ( "fmt" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema"