diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 7d91e203abc..aac46785ee1 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -60,8 +60,6 @@ type ServerOption struct { PrintVersion bool EnableMetrics bool ListenAddress string - EnablePriorityClass bool - EnableCSIStorage bool // vc-scheduler will load (not activate) custom plugins which are in this directory PluginsDir string EnableHealthz bool @@ -107,8 +105,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object that is used for leader election; it is volcano-system by default") fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") - fs.BoolVar(&s.EnablePriorityClass, "priority-class", true, - "Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false") fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") @@ -122,8 +118,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.Int32Var(&s.PercentageOfNodesToFind, "percentage-nodes-to-find", defaultPercentageOfNodesToFind, "The percentage of nodes to find and score, if <=0 will be calcuated based on the cluster size") fs.StringVar(&s.PluginsDir, "plugins-dir", defaultPluginsDir, "vc-scheduler will load custom plugins which are in this directory") - fs.BoolVar(&s.EnableCSIStorage, "csi-storage", false, - "Enable tracking of available storage capacity that CSI drivers provide; it is false by default") fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default") fs.StringSliceVar(&s.NodeSelector, "node-selector", nil, "volcano only work with the labeled node, like: --node-selector=volcano.sh/role:train --node-selector=volcano.sh/role:serving") diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index e815e6ff3e1..fda49af371c 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" "volcano.sh/volcano/pkg/kube" ) @@ -33,10 +34,10 @@ func TestAddFlags(t *testing.T) { args := []string{ "--schedule-period=5m", - "--priority-class=false", "--cache-dumper=false", } - fs.Parse(args) + err := fs.Parse(args) + assert.NoError(t, err) // This is a snapshot of expected options parsed by args. expected := &ServerOption{ diff --git a/go.mod b/go.mod index 45bf2488919..68c4e873d56 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/prometheus/common v0.32.1 github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.7.0 go.uber.org/automaxprocs v1.4.0 golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd golang.org/x/time v0.0.0-20220609170525-579cf78fd858 @@ -78,6 +79,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/selinux v1.10.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect golang.org/x/mod v0.8.0 // indirect diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 6d7a5b11717..76fe4870718 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" kubeschedulinginformers "k8s.io/client-go/informers/scheduling/v1" @@ -52,6 +53,7 @@ import ( jobcache "volcano.sh/volcano/pkg/controllers/cache" "volcano.sh/volcano/pkg/controllers/framework" "volcano.sh/volcano/pkg/controllers/job/state" + "volcano.sh/volcano/pkg/features" ) func init() { @@ -151,39 +153,43 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { factory := informerfactory.NewSharedInformerFactory(cc.vcClient, 0) cc.vcInformerFactory = factory - cc.jobInformer = factory.Batch().V1alpha1().Jobs() - cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addJob, - UpdateFunc: cc.updateJob, - DeleteFunc: cc.deleteJob, - }) - cc.jobLister = cc.jobInformer.Lister() - cc.jobSynced = cc.jobInformer.Informer().HasSynced - - cc.cmdInformer = factory.Bus().V1alpha1().Commands() - cc.cmdInformer.Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch v := obj.(type) { - case *busv1alpha1.Command: - if v.TargetObject != nil && - v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && - v.TargetObject.Kind == "Job" { - return true - } + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + cc.jobInformer = factory.Batch().V1alpha1().Jobs() + cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addJob, + UpdateFunc: cc.updateJob, + DeleteFunc: cc.deleteJob, + }) + cc.jobLister = cc.jobInformer.Lister() + cc.jobSynced = cc.jobInformer.Informer().HasSynced + } - return false - default: - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addCommand, + if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) { + cc.cmdInformer = factory.Bus().V1alpha1().Commands() + cc.cmdInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch v := obj.(type) { + case *busv1alpha1.Command: + if v.TargetObject != nil && + v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && + v.TargetObject.Kind == "Job" { + return true + } + + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addCommand, + }, }, - }, - ) - cc.cmdLister = cc.cmdInformer.Lister() - cc.cmdSynced = cc.cmdInformer.Informer().HasSynced + ) + cc.cmdLister = cc.cmdInformer.Lister() + cc.cmdSynced = cc.cmdInformer.Informer().HasSynced + } cc.podInformer = sharedInformers.Core().V1().Pods() cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,9 +216,11 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { cc.pgLister = cc.pgInformer.Lister() cc.pgSynced = cc.pgInformer.Informer().HasSynced - cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses() - cc.pcLister = cc.pcInformer.Lister() - cc.pcSynced = cc.pcInformer.Informer().HasSynced + if utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { + cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses() + cc.pcLister = cc.pcInformer.Lister() + cc.pcSynced = cc.pcInformer.Informer().HasSynced + } cc.queueInformer = factory.Scheduling().V1beta1().Queues() cc.queueLister = cc.queueInformer.Lister() diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go index bb0272e1186..395bfecc254 100644 --- a/pkg/controllers/podgroup/pg_controller.go +++ b/pkg/controllers/podgroup/pg_controller.go @@ -18,6 +18,7 @@ package podgroup import ( "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" appinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -34,6 +35,7 @@ import ( schedulinginformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" schedulinglister "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/framework" + "volcano.sh/volcano/pkg/features" commonutil "volcano.sh/volcano/pkg/util" ) @@ -103,13 +105,14 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error { pg.pgLister = pg.pgInformer.Lister() pg.pgSynced = pg.pgInformer.Informer().HasSynced - pg.rsInformer = pg.informerFactory.Apps().V1().ReplicaSets() - pg.rsSynced = pg.rsInformer.Informer().HasSynced - pg.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: pg.addReplicaSet, - UpdateFunc: pg.updateReplicaSet, - }) - + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + pg.rsInformer = pg.informerFactory.Apps().V1().ReplicaSets() + pg.rsSynced = pg.rsInformer.Informer().HasSynced + pg.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pg.addReplicaSet, + UpdateFunc: pg.updateReplicaSet, + }) + } return nil } diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index ffdc252df19..1f38bc78a20 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -46,6 +47,7 @@ import ( "volcano.sh/volcano/pkg/controllers/apis" "volcano.sh/volcano/pkg/controllers/framework" queuestate "volcano.sh/volcano/pkg/controllers/queue/state" + "volcano.sh/volcano/pkg/features" ) func init() { @@ -137,22 +139,24 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error { DeleteFunc: c.deletePodGroup, }) - c.cmdInformer = factory.Bus().V1alpha1().Commands() - c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch v := obj.(type) { - case *busv1alpha1.Command: - return IsQueueReference(v.TargetObject) - default: - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: c.addCommand, - }, - }) - c.cmdLister = c.cmdInformer.Lister() - c.cmdSynced = c.cmdInformer.Informer().HasSynced + if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) { + c.cmdInformer = factory.Bus().V1alpha1().Commands() + c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch v := obj.(type) { + case *busv1alpha1.Command: + return IsQueueReference(v.TargetObject) + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCommand, + }, + }) + c.cmdLister = c.cmdInformer.Lister() + c.cmdSynced = c.cmdInformer.Informer().HasSynced + } queuestate.SyncQueue = c.syncQueue queuestate.OpenQueue = c.openQueue diff --git a/pkg/features/volcano_features.go b/pkg/features/volcano_features.go new file mode 100644 index 00000000000..6de854bc603 --- /dev/null +++ b/pkg/features/volcano_features.go @@ -0,0 +1,57 @@ +/* + Copyright 2023 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package features + +import ( + "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/featuregate" +) + +const ( + // WorkLoadSupport can cache and operate workload resource, Deployment/Replicas/ReplicationController/StatefulSet resources currently. + WorkLoadSupport featuregate.Feature = "WorkLoadSupport" + + // VolcanoJobSupport can identify and schedule volcano job. + VolcanoJobSupport featuregate.Feature = "VolcanoJobSupport" + + // QueueCommandSync supports queue command sync. + QueueCommandSync featuregate.Feature = "QueueCommandSync" + + // PriorityClass to provide the capacity of preemption at pod group level. + PriorityClass featuregate.Feature = "PriorityClass" + + // CSIStorage tracking of available storage capacity that CSI drivers provide + CSIStorage featuregate.Feature = "CSIStorage" + + // ResourceTopology supports resources like cpu/memory topology aware. + ResourceTopology featuregate.Feature = "ResourceTopology" +) + +func init() { + runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultVolcanoFeatureGates)) +} + +var defaultVolcanoFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + WorkLoadSupport: {Default: true, PreRelease: featuregate.Alpha}, + VolcanoJobSupport: {Default: true, PreRelease: featuregate.Alpha}, + QueueCommandSync: {Default: true, PreRelease: featuregate.Alpha}, + PriorityClass: {Default: true, PreRelease: featuregate.Alpha}, + // CSIStorage is explicitly set to false by default. + CSIStorage: {Default: false, PreRelease: featuregate.Alpha}, + ResourceTopology: {Default: true, PreRelease: featuregate.Alpha}, +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 1eb1558287f..2b11cd60f2a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -33,6 +33,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" schedv1 "k8s.io/client-go/informers/scheduling/v1" @@ -59,7 +60,7 @@ import ( cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1" vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" - "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/features" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -501,9 +502,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu // `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far. informerFactory.Core().V1().Namespaces().Informer() informerFactory.Core().V1().Services().Informer() - informerFactory.Core().V1().ReplicationControllers().Informer() - informerFactory.Apps().V1().ReplicaSets().Informer() - informerFactory.Apps().V1().StatefulSets().Informer() + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + informerFactory.Core().V1().ReplicationControllers().Informer() + informerFactory.Apps().V1().ReplicaSets().Informer() + informerFactory.Apps().V1().StatefulSets().Informer() + } // create informer for node information sc.nodeInformer = informerFactory.Core().V1().Nodes() @@ -561,11 +564,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu DeleteFunc: sc.DeleteCSINode, }, ) - sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() - sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() var capacityCheck *volumescheduling.CapacityCheck - if options.ServerOpts.EnableCSIStorage { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { + sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() + sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() capacityCheck = &volumescheduling.CapacityCheck{ CSIDriverInformer: sc.csiDriverInformer, CSIStorageCapacityInformer: sc.csiStorageCapacityInformer, @@ -621,7 +624,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu }, }) - if options.ServerOpts.EnablePriorityClass { + if utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { sc.pcInformer = informerFactory.Scheduling().V1().PriorityClasses() sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPriorityClass, @@ -677,12 +680,14 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu DeleteFunc: sc.DeleteQueueV1beta1, }) - sc.cpuInformer = vcinformers.Nodeinfo().V1alpha1().Numatopologies() - sc.cpuInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddNumaInfoV1alpha1, - UpdateFunc: sc.UpdateNumaInfoV1alpha1, - DeleteFunc: sc.DeleteNumaInfoV1alpha1, - }) + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceTopology) { + sc.cpuInformer = vcinformers.Nodeinfo().V1alpha1().Numatopologies() + sc.cpuInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddNumaInfoV1alpha1, + UpdateFunc: sc.UpdateNumaInfoV1alpha1, + DeleteFunc: sc.DeleteNumaInfoV1alpha1, + }) + } return sc } diff --git a/pkg/scheduler/capabilities/volumebinding/volume_binding.go b/pkg/scheduler/capabilities/volumebinding/volume_binding.go index 5edcb651c6d..c693b6a526e 100644 --- a/pkg/scheduler/capabilities/volumebinding/volume_binding.go +++ b/pkg/scheduler/capabilities/volumebinding/volume_binding.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/component-helpers/storage/ephemeral" "k8s.io/klog/v2" @@ -36,7 +37,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/features" ) const ( @@ -377,7 +378,7 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses() csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes() var capacityCheck *CapacityCheck - if options.ServerOpts.EnableCSIStorage { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { capacityCheck = &CapacityCheck{ CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(), CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1beta1().CSIStorageCapacities(), diff --git a/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go b/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go index 8f32dc34767..55f9d5c1164 100644 --- a/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go @@ -21,8 +21,6 @@ import ( "reflect" "testing" - "volcano.sh/volcano/cmd/scheduler/app/options" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" v1 "k8s.io/api/core/v1" @@ -30,12 +28,16 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + + "volcano.sh/volcano/pkg/features" ) var ( @@ -593,9 +595,7 @@ func TestVolumeBinding(t *testing.T) { }, } - options.ServerOpts = &options.ServerOption{ - EnableCSIStorage: true, - } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIStorage, true)() for _, item := range table { t.Run(item.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background())