Skip to content

Commit

Permalink
Merge pull request volcano-sh#3343 from lowang-bh/utFramework
Browse files Browse the repository at this point in the history
feat: add ut common framework
  • Loading branch information
volcano-sh-bot authored Mar 11, 2024
2 parents d996dde + 2817300 commit 0a26eb8
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 73 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestAllocate(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) {
fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient)
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down
35 changes: 20 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,22 @@ func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*sch
return podGroupInfo, nil
}

// UpdateQueueStatus will update the status of queue
func (su *defaultStatusUpdater) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error {
var newQueue = &vcv1beta1.Queue{}
if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil {
klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error())
return err
}

_, err := su.vcclient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err
}
return nil
}

type defaultVolumeBinder struct {
volumeBinder volumescheduling.SchedulerVolumeBinder
}
Expand Down Expand Up @@ -441,7 +457,7 @@ func (sc *SchedulerCache) setBatchBindParallel() {
func (sc *SchedulerCache) setDefaultVolumeBinder() {
logger := klog.FromContext(context.TODO())
var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
Expand Down Expand Up @@ -655,7 +671,7 @@ func (sc *SchedulerCache) addEventHandler() {
},
)

if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
if options.ServerOpts != nil && options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()
}
Expand Down Expand Up @@ -693,7 +709,7 @@ func (sc *SchedulerCache) addEventHandler() {
},
})

if options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) {
if options.ServerOpts != nil && options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) {
sc.pcInformer = informerFactory.Scheduling().V1().PriorityClasses()
sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPriorityClass,
Expand Down Expand Up @@ -1378,18 +1394,7 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b

// UpdateQueueStatus update the status of queue.
func (sc *SchedulerCache) UpdateQueueStatus(queue *schedulingapi.QueueInfo) error {
var newQueue = &vcv1beta1.Queue{}
if err := schedulingscheme.Scheme.Convert(queue.Queue, newQueue, nil); err != nil {
klog.Errorf("error occurred in converting scheduling.Queue to v1beta1.Queue: %s", err.Error())
return err
}

_, err := sc.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err
}
return nil
return sc.StatusUpdater.UpdateQueueStatus(queue)
}

func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/cache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func checkAndSetDefaultInterface(sc *SchedulerCache) {
}

func getNodeWorkers() uint32 {
if options.ServerOpts.NodeWorkerThreads > 0 {
if options.ServerOpts != nil && options.ServerOpts.NodeWorkerThreads > 0 {
return options.ServerOpts.NodeWorkerThreads
}
threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS"))
Expand Down Expand Up @@ -116,7 +116,7 @@ func newMockSchedulerCache(schedulerName string) *SchedulerCache {

NodeList: []string{},
}
if len(options.ServerOpts.NodeSelector) > 0 {
if options.ServerOpts != nil && len(options.ServerOpts.NodeSelector) > 0 {
msc.updateNodeSelectors(options.ServerOpts.NodeSelector)
}
msc.setBatchBindParallel()
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Evictor interface {
type StatusUpdater interface {
UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error)
UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error)
UpdateQueueStatus(queue *api.QueueInfo) error
}

// BatchBinder updates podgroup or job information
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/drf/hdrf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestHDRF(t *testing.T) {

binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 300),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestEventHandler(t *testing.T) {
// initialize schedulerCache
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
Channel: make(chan string, 10),
}
recorder := record.NewFakeRecorder(100)
go func() {
Expand Down
234 changes: 234 additions & 0 deletions pkg/scheduler/uthelper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
Copyright 2024 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package uthelper

import (
"fmt"
"reflect"
"time"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"

"volcano.sh/apis/pkg/apis/scheduling"
vcapisv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

// RegistPlugins plugins
func RegistPlugins(plugins map[string]framework.PluginBuilder) {
for name, plugin := range plugins {
framework.RegisterPluginBuilder(name, plugin)
}
}

// TestCommonStruct is the most common used resource when do UT
// others can wrap it in a new struct
type TestCommonStruct struct {
Name string
Plugins map[string]framework.PluginBuilder // plugins for each case
Pods []*v1.Pod
Nodes []*v1.Node
PodGroups []*vcapisv1.PodGroup
Queues []*vcapisv1.Queue
PriClass []*schedulingv1.PriorityClass
Bind map[string]string // bind results: ns/podName -> nodeName
PipeLined map[string][]string // pipelined results: map[jobID][]{nodename}
Evicted []string // evicted pods list of ns/podName
Status map[api.JobID]scheduling.PodGroupPhase // final status
BindsNum int // binds events numbers
EvictNum int // evict events numbers, include preempted and reclaimed evict events

// fake interface instance when check results need
stop chan struct{}
binder cache.Binder
evictor cache.Evictor
stsUpdator cache.StatusUpdater
volBinder cache.VolumeBinder
ssn *framework.Session // store opened session
}

var _ Interface = &TestCommonStruct{}

// RegistSession open session with tiers and configuration, and mock schedulerCache with self-defined FakeBinder and FakeEvictor
func (test *TestCommonStruct) RegistSession(tiers []conf.Tier, config []conf.Configuration) *framework.Session {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
evictor := &util.FakeEvictor{
Channel: make(chan string),
}
stsUpdator := &util.FakeStatusUpdater{}
test.binder = binder
test.evictor = evictor
test.stop = make(chan struct{})
// Create scheduler cache with self-defined binder and evictor
schedulerCache := cache.NewCustomMockSchedulerCache("utmock-scheduler", binder, evictor, stsUpdator, nil, nil, nil)
test.stsUpdator = schedulerCache.StatusUpdater
test.volBinder = schedulerCache.VolumeBinder

for _, node := range test.Nodes {
schedulerCache.AddOrUpdateNode(node)
}
for _, pod := range test.Pods {
schedulerCache.AddPod(pod)
}
for _, pg := range test.PodGroups {
schedulerCache.AddPodGroupV1beta1(pg)
}
for _, queue := range test.Queues {
schedulerCache.AddQueueV1beta1(queue)
}
for _, pc := range test.PriClass {
schedulerCache.AddPriorityClass(pc)
}

RegistPlugins(test.Plugins)
ssn := framework.OpenSession(schedulerCache, tiers, config)
test.ssn = ssn
schedulerCache.Run(test.stop)
return ssn
}

// Run choose to run passed in actions; if no actions provided, will panic
func (test *TestCommonStruct) Run(actions []framework.Action) {
if len(actions) == 0 {
panic("no actions provided, please specify a list of actions to execute")
}
for _, action := range actions {
action.Initialize()
action.Execute(test.ssn)
action.UnInitialize()
}
}

// Close do release resource and clean up
func (test *TestCommonStruct) Close() {
framework.CloseSession(test.ssn)
framework.CleanupPluginBuilders()
close(test.stop)
}

// CheckAll checks all the need status
func (test *TestCommonStruct) CheckAll(caseIndex int) (err error) {
if err = test.CheckBind(caseIndex); err != nil {
return
}
if err = test.CheckEvict(caseIndex); err != nil {
return
}
if err = test.CheckPipelined(caseIndex); err != nil {
return
}
return test.CheckPGStatus(caseIndex)
}

// CheckBind check expected bind result
func (test *TestCommonStruct) CheckBind(caseIndex int) error {
binder := test.binder.(*util.FakeBinder)
for i := 0; i < test.BindsNum; i++ {
select {
case <-binder.Channel:
case <-time.After(300 * time.Millisecond):
return fmt.Errorf("Failed to get Bind request in case %d(%s).", caseIndex, test.Name)
}
}

if len(test.Bind) != len(binder.Binds) {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Bind, binder.Binds)
}
for key, value := range test.Bind {
got := binder.Binds[key]
if value != got {
return fmt.Errorf("case %d(%s) check bind: \nwant: %v->%v\n got: %v->%v ", caseIndex, test.Name, key, value, key, got)
}
}
return nil
}

// CheckEvict check the evicted result
func (test *TestCommonStruct) CheckEvict(caseIndex int) error {
evictor := test.evictor.(*util.FakeEvictor)
for i := 0; i < test.EvictNum; i++ {
select {
case <-evictor.Channel:
case <-time.After(300 * time.Millisecond):
return fmt.Errorf("Failed to get Evict request in case %d(%s).", caseIndex, test.Name)
}
}

evicts := evictor.Evicts()
if len(test.Evicted) != len(evicts) {
return fmt.Errorf("case %d(%s) check evict: \nwant: %v, \ngot %v ", caseIndex, test.Name, test.Evicted, evicts)
}

expect := map[string]int{} // evicted number
got := map[string]int{}
for _, v := range test.Evicted {
expect[v]++
}
for _, v := range evicts {
got[v]++
}

if !reflect.DeepEqual(expect, got) {
return fmt.Errorf("case %d(%s) check evict: \nwant: %v\n got: %v ", caseIndex, test.Name, expect, got)
}
return nil
}

// CheckPGStatus check job's podgroups status
func (test *TestCommonStruct) CheckPGStatus(caseIndex int) error {
ssn := test.ssn
for jobID, phase := range test.Status {
job := ssn.Jobs[jobID]
if job == nil {
return fmt.Errorf("case %d(%s) check podgroup status, job <%v> doesn't exist in session", caseIndex, test.Name, jobID)
}
got := job.PodGroup.Status.Phase
if phase != got {
return fmt.Errorf("case %d(%s) check podgroup <%v> status:\n want: %v, got: %v", caseIndex, test.Name, jobID, phase, got)
}
}
return nil
}

// CheckPipelined checks pipeline results
func (test *TestCommonStruct) CheckPipelined(caseIndex int) error {
ssn := test.ssn
for jobID, nodes := range test.PipeLined {
job := ssn.Jobs[api.JobID(jobID)]
if job == nil {
return fmt.Errorf("case %d(%s) check pipeline, job <%v> doesn't exist in session", caseIndex, test.Name, jobID)
}
pipeLined := job.TaskStatusIndex[api.Pipelined]
if len(pipeLined) == 0 {
return fmt.Errorf("case %d(%s) check pipeline, want pipelined job: %v, actualy, no tasks pipelined to nodes %v", caseIndex, test.Name, jobID, nodes)
}
for _, task := range pipeLined {
if !Contains(nodes, task.NodeName) {
return fmt.Errorf("case %d(%s) check pipeline: actual: %v->%v, want: %v->%v", caseIndex, test.Name, task.Name, task.NodeName, task.Name, nodes)
}
}
}
return nil
}
Loading

0 comments on commit 0a26eb8

Please sign in to comment.