Skip to content

Commit

Permalink
Merge pull request volcano-sh#3497 from y-ykcir/sharedinformer
Browse files Browse the repository at this point in the history
optimizate controllers with one sharedInformerFactory
  • Loading branch information
volcano-sh-bot authored Jun 3, 2024
2 parents cc21814 + 384f433 commit 84dd53d
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 60 deletions.
2 changes: 2 additions & 0 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"volcano.sh/apis/pkg/apis/helpers"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/cmd/controller-manager/app/options"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/kube"
Expand Down Expand Up @@ -122,6 +123,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c
controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)
controllerOpt.VCSharedInformerFactory = informerfactory.NewSharedInformerFactory(controllerOpt.VolcanoClient, 0)
controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations
controllerOpt.WorkerThreadsForPG = opt.WorkerThreadsForPG
controllerOpt.WorkerThreadsForGC = opt.WorkerThreadsForGC
Expand Down
14 changes: 8 additions & 6 deletions pkg/controllers/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import (
"k8s.io/client-go/kubernetes"

vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
)

// ControllerOption is the main context object for the controllers.
type ControllerOption struct {
KubeClient kubernetes.Interface
VolcanoClient vcclientset.Interface
SharedInformerFactory informers.SharedInformerFactory
SchedulerNames []string
WorkerNum uint32
MaxRequeueNum int
KubeClient kubernetes.Interface
VolcanoClient vcclientset.Interface
SharedInformerFactory informers.SharedInformerFactory
VCSharedInformerFactory vcinformer.SharedInformerFactory
SchedulerNames []string
WorkerNum uint32
MaxRequeueNum int

InheritOwnerAnnotations bool
WorkerThreadsForPG uint32
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformers "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
batchlisters "volcano.sh/apis/pkg/client/listers/batch/v1alpha1"
Expand Down Expand Up @@ -75,7 +74,7 @@ func (gc *gccontroller) Name() string {
func (gc *gccontroller) Initialize(opt *framework.ControllerOption) error {
gc.vcClient = opt.VolcanoClient

factory := informerfactory.NewSharedInformerFactory(gc.vcClient, 0)
factory := opt.VCSharedInformerFactory
jobInformer := factory.Batch().V1alpha1().Jobs()

gc.vcInformerFactory = factory
Expand Down
22 changes: 17 additions & 5 deletions pkg/controllers/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,27 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/pkg/controllers/framework"
)

func newFakeController() *gccontroller {
volcanoClientSet := volcanoclient.NewSimpleClientset()
vcSharedInformers := informerfactory.NewSharedInformerFactory(volcanoClientSet, 0)

controller := &gccontroller{}
opt := &framework.ControllerOption{
VolcanoClient: volcanoClientSet,
VCSharedInformerFactory: vcSharedInformers,
}

controller.Initialize(opt)

return controller
}

func TestGarbageCollector_ProcessJob(t *testing.T) {

}
Expand Down Expand Up @@ -84,10 +99,7 @@ func TestGarbageCollector_ProcessTTL(t *testing.T) {
},
}
for i, testcase := range testcases {
gc := &gccontroller{}
gc.Initialize(&framework.ControllerOption{
VolcanoClient: volcanoclient.NewSimpleClientset(),
})
gc := newFakeController()

expired, err := gc.processTTL(testcase.Job)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
vcscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
businformer "volcano.sh/apis/pkg/client/informers/externalversions/bus/v1alpha1"
Expand Down Expand Up @@ -152,7 +151,7 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}

factory := informerfactory.NewSharedInformerFactory(cc.vcClient, 0)
factory := opt.VCSharedInformerFactory
cc.vcInformerFactory = factory
if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) {
cc.jobInformer = factory.Batch().V1alpha1().Jobs()
Expand Down
11 changes: 7 additions & 4 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"volcano.sh/apis/pkg/apis/helpers"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/pkg/controllers/framework"
)

Expand All @@ -53,13 +54,15 @@ func newController() *jobcontroller {
})

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
vcSharedInformers := informerfactory.NewSharedInformerFactory(vcclient, 0)

controller := &jobcontroller{}
opt := &framework.ControllerOption{
VolcanoClient: vcclient,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
WorkerNum: 3,
VolcanoClient: vcclient,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
VCSharedInformerFactory: vcSharedInformers,
WorkerNum: 3,
}

controller.Initialize(opt)
Expand Down
11 changes: 7 additions & 4 deletions pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/pkg/controllers/framework"
)

Expand All @@ -36,13 +37,15 @@ func newFakeController() *jobcontroller {
kubeClientSet := kubeclient.NewSimpleClientset()

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
vcSharedInformers := informerfactory.NewSharedInformerFactory(volcanoClientSet, 0)

controller := &jobcontroller{}
opt := &framework.ControllerOption{
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
WorkerNum: 3,
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
VCSharedInformerFactory: vcSharedInformers,
WorkerNum: 3,
}

controller.Initialize(opt)
Expand Down
29 changes: 18 additions & 11 deletions pkg/controllers/jobflow/jobflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

jobflowstate "volcano.sh/volcano/pkg/controllers/jobflow/state"

vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
versionedscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
flowinformer "volcano.sh/apis/pkg/client/informers/externalversions/flow/v1alpha1"
batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1"
flowlister "volcano.sh/apis/pkg/client/listers/flow/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/controllers/jobflow/state"
jobflowstate "volcano.sh/volcano/pkg/controllers/jobflow/state"
)

func init() {
Expand All @@ -58,6 +57,9 @@ type jobflowcontroller struct {
jobTemplateInformer flowinformer.JobTemplateInformer
jobInformer batchinformer.JobInformer

//InformerFactory
vcInformerFactory vcinformer.SharedInformerFactory

//jobFlowLister
jobFlowLister flowlister.JobFlowLister
jobFlowSynced cache.InformerSynced
Expand Down Expand Up @@ -89,19 +91,22 @@ func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {
jf.kubeClient = opt.KubeClient
jf.vcClient = opt.VolcanoClient

jf.jobFlowInformer = informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Flow().V1alpha1().JobFlows()
factory := opt.VCSharedInformerFactory
jf.vcInformerFactory = factory

jf.jobFlowInformer = factory.Flow().V1alpha1().JobFlows()
jf.jobFlowSynced = jf.jobFlowInformer.Informer().HasSynced
jf.jobFlowLister = jf.jobFlowInformer.Lister()
jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jf.addJobFlow,
UpdateFunc: jf.updateJobFlow,
})

jf.jobTemplateInformer = informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Flow().V1alpha1().JobTemplates()
jf.jobTemplateInformer = factory.Flow().V1alpha1().JobTemplates()
jf.jobTemplateSynced = jf.jobTemplateInformer.Informer().HasSynced
jf.jobTemplateLister = jf.jobTemplateInformer.Lister()

jf.jobInformer = informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Batch().V1alpha1().Jobs()
jf.jobInformer = factory.Batch().V1alpha1().Jobs()
jf.jobSynced = jf.jobInformer.Informer().HasSynced
jf.jobLister = jf.jobInformer.Lister()
jf.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -131,11 +136,13 @@ func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {
func (jf *jobflowcontroller) Run(stopCh <-chan struct{}) {
defer jf.queue.ShutDown()

go jf.jobFlowInformer.Informer().Run(stopCh)
go jf.jobTemplateInformer.Informer().Run(stopCh)
go jf.jobInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, jf.jobSynced, jf.jobFlowSynced, jf.jobTemplateSynced)
jf.vcInformerFactory.Start(stopCh)
for informerType, ok := range jf.vcInformerFactory.WaitForCacheSync(stopCh) {
if !ok {
klog.Errorf("caches failed to sync: %v", informerType)
return
}
}

go wait.Until(jf.worker, time.Second, stopCh)

Expand Down
11 changes: 7 additions & 4 deletions pkg/controllers/jobflow/jobflow_controller_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"volcano.sh/apis/pkg/apis/helpers"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
"volcano.sh/apis/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/pkg/controllers/framework"
)

Expand All @@ -40,13 +41,15 @@ func newFakeController() *jobflowcontroller {
kubeClientSet := kubeclient.NewSimpleClientset()

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
vcSharedInformers := informerfactory.NewSharedInformerFactory(volcanoClientSet, 0)

controller := &jobflowcontroller{}
opt := &framework.ControllerOption{
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
WorkerNum: 3,
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
VCSharedInformerFactory: vcSharedInformers,
WorkerNum: 3,
}

controller.Initialize(opt)
Expand Down
23 changes: 16 additions & 7 deletions pkg/controllers/jobtemplate/jobtemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
versionedscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
flowinformer "volcano.sh/apis/pkg/client/informers/externalversions/flow/v1alpha1"
batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1"
Expand All @@ -54,6 +54,9 @@ type jobtemplatecontroller struct {
jobTemplateInformer flowinformer.JobTemplateInformer
jobInformer batchinformer.JobInformer

//InformerFactory
vcInformerFactory vcinformer.SharedInformerFactory

//jobTemplateLister
jobTemplateLister flowlister.JobTemplateLister
jobTemplateSynced cache.InformerSynced
Expand Down Expand Up @@ -81,14 +84,17 @@ func (jt *jobtemplatecontroller) Initialize(opt *framework.ControllerOption) err
jt.kubeClient = opt.KubeClient
jt.vcClient = opt.VolcanoClient

jt.jobTemplateInformer = informerfactory.NewSharedInformerFactory(jt.vcClient, 0).Flow().V1alpha1().JobTemplates()
factory := opt.VCSharedInformerFactory
jt.vcInformerFactory = factory

jt.jobTemplateInformer = factory.Flow().V1alpha1().JobTemplates()
jt.jobTemplateSynced = jt.jobTemplateInformer.Informer().HasSynced
jt.jobTemplateLister = jt.jobTemplateInformer.Lister()
jt.jobTemplateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jt.addJobTemplate,
})

jt.jobInformer = informerfactory.NewSharedInformerFactory(jt.vcClient, 0).Batch().V1alpha1().Jobs()
jt.jobInformer = factory.Batch().V1alpha1().Jobs()
jt.jobSynced = jt.jobInformer.Informer().HasSynced
jt.jobLister = jt.jobInformer.Lister()
jt.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -117,10 +123,13 @@ func (jt *jobtemplatecontroller) Initialize(opt *framework.ControllerOption) err
func (jt *jobtemplatecontroller) Run(stopCh <-chan struct{}) {
defer jt.queue.ShutDown()

go jt.jobTemplateInformer.Informer().Run(stopCh)
go jt.jobInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, jt.jobSynced, jt.jobTemplateSynced)
jt.vcInformerFactory.Start(stopCh)
for informerType, ok := range jt.vcInformerFactory.WaitForCacheSync(stopCh) {
if !ok {
klog.Errorf("caches failed to sync: %v", informerType)
return
}
}

go wait.Until(jt.worker, time.Second, stopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
jobflowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"

volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/pkg/controllers/framework"
)

Expand All @@ -37,13 +37,15 @@ func newFakeController() *jobtemplatecontroller {
kubeClientSet := kubeclient.NewSimpleClientset()

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
vcSharedInformers := informerfactory.NewSharedInformerFactory(volcanoClientSet, 0)

controller := &jobtemplatecontroller{}
opt := &framework.ControllerOption{
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
WorkerNum: 3,
VolcanoClient: volcanoClientSet,
KubeClient: kubeClientSet,
SharedInformerFactory: sharedInformers,
VCSharedInformerFactory: vcSharedInformers,
WorkerNum: 3,
}

controller.Initialize(opt)
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
schedulinginformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
schedulinglister "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error {
AddFunc: pg.addPod,
})

factory := informerfactory.NewSharedInformerFactory(pg.vcClient, 0)
factory := opt.VCSharedInformerFactory
pg.vcInformerFactory = factory
pg.pgInformer = factory.Scheduling().V1beta1().PodGroups()
pg.pgLister = pg.pgInformer.Lister()
Expand Down
Loading

0 comments on commit 84dd53d

Please sign in to comment.