diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index cfab5ed80c..24007f5cd0 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -165,7 +165,7 @@ func TestAllocate(t *testing.T) { t.Run(test.name, func(t *testing.T) { binder := &util.FakeBinder{ Binds: map[string]string{}, - Channel: make(chan string), + Channel: make(chan string, 10), } schedulerCache := &cache.SchedulerCache{ Nodes: make(map[string]*api.NodeInfo), @@ -323,7 +323,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) { fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient) binder := &util.FakeBinder{ Binds: map[string]string{}, - Channel: make(chan string), + Channel: make(chan string, 10), } schedulerCache := &cache.SchedulerCache{ Nodes: make(map[string]*api.NodeInfo), diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 0b04baf35b..9b29639020 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -309,6 +309,22 @@ func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*sch return podGroupInfo, nil } +// UpdateQueueStatus will update the status of queue +func (su *defaultStatusUpdater) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error { + var newQueue = &vcv1beta1.Queue{} + if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil { + klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error()) + return err + } + + _, err := su.vcclient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error()) + return err + } + return nil +} + type defaultVolumeBinder struct { volumeBinder volumescheduling.SchedulerVolumeBinder } @@ -441,7 +457,7 @@ func (sc *SchedulerCache) setBatchBindParallel() { func (sc *SchedulerCache) setDefaultVolumeBinder() { logger := klog.FromContext(context.TODO()) var capacityCheck *volumescheduling.CapacityCheck - if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { + if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { capacityCheck = &volumescheduling.CapacityCheck{ CSIDriverInformer: sc.csiDriverInformer, CSIStorageCapacityInformer: sc.csiStorageCapacityInformer, @@ -655,7 +671,7 @@ func (sc *SchedulerCache) addEventHandler() { }, ) - if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { + if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() } @@ -693,7 +709,7 @@ func (sc *SchedulerCache) addEventHandler() { }, }) - if options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { + if options.ServerOpts != nil && options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { sc.pcInformer = informerFactory.Scheduling().V1().PriorityClasses() sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPriorityClass, @@ -1378,18 +1394,7 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b // UpdateQueueStatus update the status of queue. func (sc *SchedulerCache) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error { - var newQueue = &vcv1beta1.Queue{} - if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil { - klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error()) - return err - } - - _, err := sc.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error()) - return err - } - return nil + return sc.StatusUpdater.UpdateQueueStatus(queue) } func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) { diff --git a/pkg/scheduler/cache/cache_mock.go b/pkg/scheduler/cache/cache_mock.go index 03f08cc190..87c17ece20 100644 --- a/pkg/scheduler/cache/cache_mock.go +++ b/pkg/scheduler/cache/cache_mock.go @@ -84,7 +84,7 @@ func checkAndSetDefaultInterface(sc *SchedulerCache) { } func getNodeWorkers() uint32 { - if options.ServerOpts.NodeWorkerThreads > 0 { + if options.ServerOpts != nil && options.ServerOpts.NodeWorkerThreads > 0 { return options.ServerOpts.NodeWorkerThreads } threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS")) @@ -116,7 +116,7 @@ func newMockSchedulerCache(schedulerName string) *SchedulerCache { NodeList: []string{}, } - if len(options.ServerOpts.NodeSelector) > 0 { + if options.ServerOpts != nil && len(options.ServerOpts.NodeSelector) > 0 { msc.updateNodeSelectors(options.ServerOpts.NodeSelector) } msc.setBatchBindParallel() diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index dd68896b33..63c704e4ad 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -111,6 +111,7 @@ type Evictor interface { type StatusUpdater interface { UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) + UpdateQueueStatus(queue *api.QueueInfo) error } // BatchBinder updates podgroup or job information diff --git a/pkg/scheduler/plugins/drf/hdrf_test.go b/pkg/scheduler/plugins/drf/hdrf_test.go index cc398a1fae..e73d328a65 100644 --- a/pkg/scheduler/plugins/drf/hdrf_test.go +++ b/pkg/scheduler/plugins/drf/hdrf_test.go @@ -252,7 +252,7 @@ func TestHDRF(t *testing.T) { binder := &util.FakeBinder{ Binds: map[string]string{}, - Channel: make(chan string), + Channel: make(chan string, 300), } schedulerCache := &cache.SchedulerCache{ Nodes: make(map[string]*api.NodeInfo), diff --git a/pkg/scheduler/plugins/predicates/predicates_test.go b/pkg/scheduler/plugins/predicates/predicates_test.go index 1cbb1e77b1..191240654d 100644 --- a/pkg/scheduler/plugins/predicates/predicates_test.go +++ b/pkg/scheduler/plugins/predicates/predicates_test.go @@ -147,7 +147,7 @@ func TestEventHandler(t *testing.T) { // initialize schedulerCache binder := &util.FakeBinder{ Binds: map[string]string{}, - Channel: make(chan string), + Channel: make(chan string, 10), } recorder := record.NewFakeRecorder(100) go func() { diff --git a/pkg/scheduler/uthelper/helper.go b/pkg/scheduler/uthelper/helper.go new file mode 100644 index 0000000000..a3fb030ec6 --- /dev/null +++ b/pkg/scheduler/uthelper/helper.go @@ -0,0 +1,234 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package uthelper + +import ( + "fmt" + "reflect" + "time" + + v1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + + "volcano.sh/apis/pkg/apis/scheduling" + vcapisv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +// RegistPlugins plugins +func RegistPlugins(plugins map[string]framework.PluginBuilder) { + for name, plugin := range plugins { + framework.RegisterPluginBuilder(name, plugin) + } +} + +// TestCommonStruct is the most common used resource when do UT +// others can wrap it in a new struct +type TestCommonStruct struct { + Name string + Plugins map[string]framework.PluginBuilder // plugins for each case + Pods []*v1.Pod + Nodes []*v1.Node + PodGroups []*vcapisv1.PodGroup + Queues []*vcapisv1.Queue + PriClass []*schedulingv1.PriorityClass + Bind map[string]string // bind results: ns/podName -> nodeName + PipeLined map[string][]string // pipelined results: map[jobID][]{nodename} + Evicted []string // evicted pods list of ns/podName + Status map[api.JobID]scheduling.PodGroupPhase // final status + BindsNum int // binds events numbers + EvictNum int // evict events numbers, include preempted and reclaimed evict events + + // fake interface instance when check results need + stop chan struct{} + binder cache.Binder + evictor cache.Evictor + stsUpdator cache.StatusUpdater + volBinder cache.VolumeBinder + ssn *framework.Session // store opened session +} + +var _ Interface = &TestCommonStruct{} + +// RegistSession open session with tiers and configuration, and mock schedulerCache with self-defined FakeBinder and FakeEvictor +func (test *TestCommonStruct) RegistSession(tiers []conf.Tier, config []conf.Configuration) *framework.Session { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string), + } + evictor := &util.FakeEvictor{ + Channel: make(chan string), + } + stsUpdator := &util.FakeStatusUpdater{} + test.binder = binder + test.evictor = evictor + test.stop = make(chan struct{}) + // Create scheduler cache with self-defined binder and evictor + schedulerCache := cache.NewCustomMockSchedulerCache("utmock-scheduler", binder, evictor, stsUpdator, nil, nil, nil) + test.stsUpdator = schedulerCache.StatusUpdater + test.volBinder = schedulerCache.VolumeBinder + + for _, node := range test.Nodes { + schedulerCache.AddOrUpdateNode(node) + } + for _, pod := range test.Pods { + schedulerCache.AddPod(pod) + } + for _, pg := range test.PodGroups { + schedulerCache.AddPodGroupV1beta1(pg) + } + for _, queue := range test.Queues { + schedulerCache.AddQueueV1beta1(queue) + } + for _, pc := range test.PriClass { + schedulerCache.AddPriorityClass(pc) + } + + RegistPlugins(test.Plugins) + ssn := framework.OpenSession(schedulerCache, tiers, config) + test.ssn = ssn + schedulerCache.Run(test.stop) + return ssn +} + +// Run choose to run passed in actions; if no actions provided, will panic +func (test *TestCommonStruct) Run(actions []framework.Action) { + if len(actions) == 0 { + panic("no actions provided, please specify a list of actions to execute") + } + for _, action := range actions { + action.Initialize() + action.Execute(test.ssn) + action.UnInitialize() + } +} + +// Close do release resource and clean up +func (test *TestCommonStruct) Close() { + framework.CloseSession(test.ssn) + framework.CleanupPluginBuilders() + close(test.stop) +} + +// CheckAll checks all the need status +func (test *TestCommonStruct) CheckAll(caseIndex int) (err error) { + if err = test.CheckBind(caseIndex); err != nil { + return + } + if err = test.CheckEvict(caseIndex); err != nil { + return + } + if err = test.CheckPipelined(caseIndex); err != nil { + return + } + return test.CheckPGStatus(caseIndex) +} + +// CheckBind check expected bind result +func (test *TestCommonStruct) CheckBind(caseIndex int) error { + binder := test.binder.(*util.FakeBinder) + for i := 0; i < test.BindsNum; i++ { + select { + case <-binder.Channel: + case <-time.After(300 * time.Millisecond): + return fmt.Errorf("Failed to get Bind request in case %d(%s).", caseIndex, test.Name) + } + } + + if len(test.Bind) != len(binder.Binds) { + return fmt.Errorf("case %d(%s) check bind: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Bind, binder.Binds) + } + for key, value := range test.Bind { + got := binder.Binds[key] + if value != got { + return fmt.Errorf("case %d(%s) check bind: \nwant: %v->%v\n got: %v->%v ", caseIndex, test.Name, key, value, key, got) + } + } + return nil +} + +// CheckEvict check the evicted result +func (test *TestCommonStruct) CheckEvict(caseIndex int) error { + evictor := test.evictor.(*util.FakeEvictor) + for i := 0; i < test.EvictNum; i++ { + select { + case <-evictor.Channel: + case <-time.After(300 * time.Millisecond): + return fmt.Errorf("Failed to get Evict request in case %d(%s).", caseIndex, test.Name) + } + } + + evicts := evictor.Evicts() + if len(test.Evicted) != len(evicts) { + return fmt.Errorf("case %d(%s) check evict: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Evicted, evicts) + } + + expect := map[string]int{} // evicted number + got := map[string]int{} + for _, v := range test.Evicted { + expect[v]++ + } + for _, v := range evicts { + got[v]++ + } + + if !reflect.DeepEqual(expect, got) { + return fmt.Errorf("case %d(%s) check evict: \nwant: %v\n got: %v ", caseIndex, test.Name, expect, got) + } + return nil +} + +// CheckPGStatus check job's podgroups status +func (test *TestCommonStruct) CheckPGStatus(caseIndex int) error { + ssn := test.ssn + for jobID, phase := range test.Status { + job := ssn.Jobs[jobID] + if job == nil { + return fmt.Errorf("case %d(%s) check podgroup status, job <%v> doesn't exist in session", caseIndex, test.Name, jobID) + } + got := job.PodGroup.Status.Phase + if phase != got { + return fmt.Errorf("case %d(%s) check podgroup <%v> status:\n want: %v, got: %v", caseIndex, test.Name, jobID, phase, got) + } + } + return nil +} + +// CheckPipelined checks pipeline results +func (test *TestCommonStruct) CheckPipelined(caseIndex int) error { + ssn := test.ssn + for jobID, nodes := range test.PipeLined { + job := ssn.Jobs[api.JobID(jobID)] + if job == nil { + return fmt.Errorf("case %d(%s) check pipeline, job <%v> doesn't exist in session", caseIndex, test.Name, jobID) + } + pipeLined := job.TaskStatusIndex[api.Pipelined] + if len(pipeLined) == 0 { + return fmt.Errorf("case %d(%s) check pipeline, want pipelined job: %v, actualy, no tasks pipelined to nodes %v", caseIndex, test.Name, jobID, nodes) + } + for _, task := range pipeLined { + if !Contains(nodes, task.NodeName) { + return fmt.Errorf("case %d(%s) check pipeline: actual: %v->%v, want: %v->%v", caseIndex, test.Name, task.Name, task.NodeName, task.Name, nodes) + } + } + } + return nil +} diff --git a/pkg/scheduler/uthelper/interface.go b/pkg/scheduler/uthelper/interface.go new file mode 100644 index 0000000000..b67d8c1761 --- /dev/null +++ b/pkg/scheduler/uthelper/interface.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package uthelper + +import ( + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +// Interface is UT framework interface +type Interface interface { + // Run executes the actions + Run(actions []framework.Action) + // RegistSession init the session + RegistSession(tiers []conf.Tier, config []conf.Configuration) *framework.Session + // Close release session and do cleanup + Close() + // CheckAll do all checks + CheckAll(caseIndex int) (err error) + // CheckBind just check bind results in allocate action + CheckBind(caseIndex int) error + // CheckEvict just check evict results in preempt or reclaim action + CheckEvict(caseIndex int) error + // CheckPipelined check the pipelined results + CheckPipelined(caseIndex int) error + // CheckPGStatus check job's status + CheckPGStatus(caseIndex int) error +} diff --git a/pkg/scheduler/uthelper/util.go b/pkg/scheduler/uthelper/util.go new file mode 100644 index 0000000000..e767fb7d2d --- /dev/null +++ b/pkg/scheduler/uthelper/util.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package uthelper + +// Contains compares a key in a slice and returns true if exist in it +func Contains[T comparable](elements []T, key T) bool { + for _, v := range elements { + if v == key { + return true + } + } + return false +} diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 70f208ed33..854432cc5f 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -23,6 +23,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -32,7 +33,6 @@ import ( "k8s.io/klog/v2" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" ) @@ -179,61 +179,14 @@ func BuildDynamicPVC(namespace, name string, req v1.ResourceList) (*v1.Persisten // BuildBestEffortPod builds a BestEffort pod object func BuildBestEffortPod(namespace, name, nodeName string, p v1.PodPhase, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), - Name: name, - Namespace: namespace, - Labels: labels, - Annotations: map[string]string{ - schedulingv1beta1.KubeGroupNameAnnotationKey: groupName, - }, - }, - Status: v1.PodStatus{ - Phase: p, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - NodeSelector: selector, - Containers: []v1.Container{ - { - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{}, - }, - }, - }, - }, - } + return BuildPod(namespace, name, nodeName, p, v1.ResourceList{}, groupName, labels, selector) } // BuildPodWithPriority builds a pod object with priority func BuildPodWithPriority(namespace, name, nodeName string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string, priority *int32) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), - Name: name, - Namespace: namespace, - Labels: labels, - Annotations: map[string]string{ - schedulingv1beta1.KubeGroupNameAnnotationKey: groupName, - }, - }, - Status: v1.PodStatus{ - Phase: p, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - NodeSelector: selector, - Priority: priority, - Containers: []v1.Container{ - { - Resources: v1.ResourceRequirements{ - Requests: req, - }, - }, - }, - }, - } + pod := BuildPod(namespace, name, nodeName, p, req, groupName, labels, selector) + pod.Spec.Priority = priority + return pod } // BuildPodGroup return podgroup with base spec and phase status @@ -276,17 +229,39 @@ func BuildQueue(qname string, weight int32, cap v1.ResourceList) *schedulingv1be } } +// BuildQueueWithAnnos return a Queue with annotations +func BuildQueueWithAnnos(qname string, weight int32, cap v1.ResourceList, annos map[string]string) *schedulingv1beta1.Queue { + queue := BuildQueue(qname, weight, cap) + queue.ObjectMeta.Annotations = annos + return queue +} + +// ////// build in resource ////// +// BuildPriorityClass return pc +func BuildPriorityClass(name string, value int32) *schedulingv1.PriorityClass { + return &schedulingv1.PriorityClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Value: value, + } +} + // FakeBinder is used as fake binder type FakeBinder struct { + sync.Mutex Binds map[string]string Channel chan string } // Bind used by fake binder struct to bind pods func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, error) { + fb.Lock() + defer fb.Unlock() for _, p := range tasks { key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) fb.Binds[key] = p.NodeName + fb.Channel <- key // need to wait binding pod because Bind process is asynchronous } return nil, nil @@ -336,6 +311,11 @@ func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, return nil, nil } +// UpdateQueueStatus do fake empty update +func (ftsu *FakeStatusUpdater) UpdateQueueStatus(queue *api.QueueInfo) error { + return nil +} + // FakeVolumeBinder is used as fake volume binder type FakeVolumeBinder struct { volumeBinder volumescheduling.SchedulerVolumeBinder