Skip to content

Commit

Permalink
Merge pull request #2898 from renwenlong-github/controller-deadlook
Browse files Browse the repository at this point in the history
fix controller deadlook when wait dependson task
  • Loading branch information
volcano-sh-bot authored Jul 18, 2023
2 parents 5302995 + 6d0ab3b commit f9d22d6
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 56 deletions.
21 changes: 7 additions & 14 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@ package options
import (
"fmt"
"os"
"time"

"github.com/spf13/pflag"

"volcano.sh/volcano/pkg/kube"
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultDetectionPeriodOfDependsOntask = 100 * time.Millisecond
defaultLockObjectNamespace = "volcano-system"
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultLockObjectNamespace = "volcano-system"
)

// ServerOption is the main context object for the controllers.
Expand All @@ -60,9 +58,6 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11252
HealthzBindAddress string
EnableHealthz bool
// For dependent tasks, there is a detection cycle inside volcano
// It indicates how often to detect the status of dependent tasks
DetectionPeriodOfDependsOntask time.Duration
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
}
Expand Down Expand Up @@ -94,8 +89,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.DurationVar(&s.DetectionPeriodOfDependsOntask, "detection-period-of-dependson-task", defaultDetectionPeriodOfDependsOntask, "It indicates how often to detect the status of dependent tasks."+
"e.g. --detection-period-of-dependson-task=1s")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
}

Expand Down
17 changes: 8 additions & 9 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ func TestAddFlags(t *testing.T) {
QPS: defaultQPS,
Burst: 200,
},
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerNames: []string{"volcano", "volcano2"},
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
DetectionPeriodOfDependsOntask: defaultDetectionPeriodOfDependsOntask,
InheritOwnerAnnotations: true,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerNames: []string{"volcano", "volcano2"},
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
InheritOwnerAnnotations: true,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
3 changes: 0 additions & 3 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/cmd/controller-manager/app/options"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/kube"
)

Expand All @@ -61,8 +60,6 @@ func Run(opt *options.ServerOption) error {
}
}

job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask)

run := startControllers(config, opt)

if !opt.EnableLeaderElection {
Expand Down
46 changes: 26 additions & 20 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -378,7 +377,16 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
go func(taskName string, podToCreateEachTask []*v1.Pod) {
taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
if job.Spec.Tasks[taskIndex].DependsOn != nil {
cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job)
if !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) {
klog.V(3).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace)
// release wait group
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
}(pod)
}
return
}
}

for _, pod := range podToCreateEachTask {
Expand Down Expand Up @@ -477,31 +485,29 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
return nil
}

func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) {
func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) bool {
if job.Spec.Tasks[taskIndex].DependsOn == nil {
return
return true
}

dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true, nil
}
// any ready to create task, return true
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true
}
return false, nil
})
} else {
for _, dependsOnTask := range dependsOn.Name {
wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
if cc.isDependsOnPodsReady(dependsOnTask, job) {
return true, nil
}
return false, nil
})
}
// all not ready to skip create task, return false
return false
}
for _, dependsOnTask := range dependsOn.Name {
// any not ready to skip create task, return false
if !cc.isDependsOnPodsReady(dependsOnTask, job) {
return false
}
}
// all ready to create task, return true
return true
}

func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool {
Expand Down
108 changes: 105 additions & 3 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"context"
"errors"
"fmt"
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"testing"

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
schedulingapi "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -261,6 +260,109 @@ func TestSyncJobFunc(t *testing.T) {
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
{
Name: "SyncJob with dependsOn job can't find the dependent task",
/*
Work dependsOn Master task, preempt actions causes controller deadlock
controller master,work scheduler
| | <---preempt----- |
| | <---kill work--- |
| ----watch work kill----> | |
| | <---kill master-- |
| ----create work pods---> | |
| --wait master running--> | |
| | |
| ---watch master kill---> | |
| --push master to queue-> | |
| -wait process to create-> | |
*/
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
ResourceVersion: "100",
UID: "e7f18111-1cec-11ea-b688-fa163ec79500",
},
Spec: v1alpha1.JobSpec{
Tasks: []v1alpha1.TaskSpec{
{
Name: "master",
Replicas: 1,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
},
{
Name: "work",
Replicas: 3,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
DependsOn: &v1alpha1.DependsOn{
Name: []string{"master"},
},
},
},
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingapi.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "job1-e7f18111-1cec-11ea-b688-fa163ec79500",
Namespace: namespace,
},
Spec: schedulingapi.PodGroupSpec{
MinResources: &v1.ResourceList{},
MinTaskMember: map[string]int32{},
},
Status: schedulingapi.PodGroupStatus{
Phase: schedulingapi.PodGroupInqueue,
},
},
PodRetainPhase: state.PodRetainPhaseNone,
UpdateStatus: nil,
JobInfo: &apis.JobInfo{
Namespace: namespace,
Name: "jobinfo1",
Pods: map[string]map[string]*v1.Pod{
"work": {
"job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil),
"job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil),
},
},
},
Pods: map[string]*v1.Pod{
"job1-work-0": buildPod(namespace, "job1-work-0", v1.PodRunning, nil),
"job1-work-1": buildPod(namespace, "job1-work-1", v1.PodRunning, nil),
},
TotalNumPods: 4,
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
}
for i, testcase := range testcases {

Expand Down
7 changes: 0 additions & 7 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package job

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,8 +32,6 @@ import (
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

var detectionPeriodOfDependsOntask time.Duration

// MakePodName append podname,jobname,taskName and index and returns the string.
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index)
Expand Down Expand Up @@ -258,7 +255,3 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
}
return false
}

func SetDetectionPeriodOfDependsOntask(period time.Duration) {
detectionPeriodOfDependsOntask = period
}

0 comments on commit f9d22d6

Please sign in to comment.