From 42a81bd77584d83e835d7e8f0f95ff0fd33a062f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oliver=20B=C3=A4hler?= Date: Fri, 22 Mar 2024 13:37:24 +0100 Subject: [PATCH] feat: add flag --workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Oliver Bähler --- controllers/pod/metadata.go | 4 +++- controllers/pv/controller.go | 4 +++- controllers/rbac/manager.go | 4 +++- controllers/resources/global.go | 4 +++- controllers/resources/namespaced.go | 4 +++- controllers/servicelabels/endpoint.go | 4 +++- controllers/servicelabels/service.go | 4 +++- controllers/tenant/manager.go | 4 +++- main.go | 19 ++++++++++--------- 9 files changed, 34 insertions(+), 17 deletions(-) diff --git a/controllers/pod/metadata.go b/controllers/pod/metadata.go index 9c9d772f6..787790cea 100644 --- a/controllers/pod/metadata.go +++ b/controllers/pod/metadata.go @@ -15,6 +15,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -123,8 +124,9 @@ func (m *MetadataReconciler) isNamespaceInTenant(ctx context.Context, namespace return len(tl.Items) > 0 } -func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.Pod{}, m.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(m) } diff --git a/controllers/pv/controller.go b/controllers/pv/controller.go index b4753b740..e743d4175 100644 --- a/controllers/pv/controller.go +++ b/controllers/pv/controller.go @@ -12,6 +12,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" log2 "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -88,7 +89,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, nil } -func (c *Controller) SetupWithManager(mgr ctrl.Manager) error { +func (c *Controller) SetupWithManager(mgr ctrl.Manager, workerCount int) error { label, err := capsuleutils.GetTypeLabel(&capsulev1beta2.Tenant{}) if err != nil { return err @@ -113,5 +114,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error { return !ok }))). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(c) } diff --git a/controllers/rbac/manager.go b/controllers/rbac/manager.go index ce4319153..24a7e36a5 100644 --- a/controllers/rbac/manager.go +++ b/controllers/rbac/manager.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -32,11 +33,12 @@ type Manager struct { } //nolint:revive -func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string) (err error) { +func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string, workerCount int) (err error) { namesPredicate := utils.NamesMatchingPredicate(ProvisionerRoleName, DeleterRoleName) crErr := ctrl.NewControllerManagedBy(mgr). For(&rbacv1.ClusterRole{}, namesPredicate). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) if crErr != nil { err = multierror.Append(err, crErr) diff --git a/controllers/resources/global.go b/controllers/resources/global.go index 4e8fa7d39..88ba1809a 100644 --- a/controllers/resources/global.go +++ b/controllers/resources/global.go @@ -16,6 +16,7 @@ import ( "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" @@ -63,7 +64,7 @@ func (r *Global) enqueueRequestFromTenant(ctx context.Context, object client.Obj return reqs } -func (r *Global) SetupWithManager(mgr ctrl.Manager) error { +func (r *Global) SetupWithManager(mgr ctrl.Manager, workerCount int) error { r.client = mgr.GetClient() r.processor = Processor{ client: mgr.GetClient(), @@ -72,6 +73,7 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.GlobalTenantResource{}). Watches(&capsulev1beta2.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRequestFromTenant)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/resources/namespaced.go b/controllers/resources/namespaced.go index 60eadcbd3..f2cb27503 100644 --- a/controllers/resources/namespaced.go +++ b/controllers/resources/namespaced.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -26,7 +27,7 @@ type Namespaced struct { processor Processor } -func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error { +func (r *Namespaced) SetupWithManager(mgr ctrl.Manager, workerCount int) error { r.client = mgr.GetClient() r.processor = Processor{ client: mgr.GetClient(), @@ -34,6 +35,7 @@ func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.TenantResource{}). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/servicelabels/endpoint.go b/controllers/servicelabels/endpoint.go index 60a451844..db877ac44 100644 --- a/controllers/servicelabels/endpoint.go +++ b/controllers/servicelabels/endpoint.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" ) type EndpointsLabelsReconciler struct { @@ -17,7 +18,7 @@ type EndpointsLabelsReconciler struct { Log logr.Logger } -func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{ obj: &corev1.Endpoints{}, client: mgr.GetClient(), @@ -26,5 +27,6 @@ func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ct return ctrl.NewControllerManagedBy(mgr). For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/servicelabels/service.go b/controllers/servicelabels/service.go index 74d1368b3..e1ff8dfa0 100644 --- a/controllers/servicelabels/service.go +++ b/controllers/servicelabels/service.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" ) type ServicesLabelsReconciler struct { @@ -17,7 +18,7 @@ type ServicesLabelsReconciler struct { Log logr.Logger } -func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{ obj: &corev1.Service{}, client: mgr.GetClient(), @@ -26,5 +27,6 @@ func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctr return ctrl.NewControllerManagedBy(mgr). For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/tenant/manager.go b/controllers/tenant/manager.go index f797bfa54..2a2f7f344 100644 --- a/controllers/tenant/manager.go +++ b/controllers/tenant/manager.go @@ -16,6 +16,7 @@ import ( "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2" @@ -28,7 +29,7 @@ type Manager struct { RESTConfig *rest.Config } -func (r *Manager) SetupWithManager(mgr ctrl.Manager) error { +func (r *Manager) SetupWithManager(mgr ctrl.Manager, workerCount int) error { return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.Tenant{}). Owns(&corev1.Namespace{}). @@ -36,6 +37,7 @@ func (r *Manager) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.LimitRange{}). Owns(&corev1.ResourceQuota{}). Owns(&rbacv1.RoleBinding{}). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/main.go b/main.go index f4918fbd9..b8155df05 100644 --- a/main.go +++ b/main.go @@ -83,7 +83,7 @@ func main() { var metricsAddr, namespace, configurationName string - var webhookPort int + var webhookPort, workerCount int var goFlagSet goflag.FlagSet @@ -94,6 +94,7 @@ func main() { "Enabling this will ensure there is only one active controller manager.") flag.BoolVar(&version, "version", false, "Print the Capsule version and exit") flag.StringVar(&configurationName, "configuration-name", "default", "The CapsuleConfiguration resource name to use") + flag.IntVar(&workerCount, "workers", 1, "Defines the number of concurrent reconciles the controller can handle") opts := zap.Options{ EncoderConfigOptions: append([]zap.EncoderConfigOption{}, func(config *zapcore.EncoderConfig) { @@ -194,7 +195,7 @@ func main() { Client: manager.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Tenant"), Recorder: manager.GetEventRecorderFor("tenant-controller"), - }).SetupWithManager(manager); err != nil { + }).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Tenant") os.Exit(1) } @@ -254,21 +255,21 @@ func main() { os.Exit(1) } - if err = rbacManager.SetupWithManager(ctx, manager, configurationName); err != nil { + if err = rbacManager.SetupWithManager(ctx, manager, configurationName, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Rbac") os.Exit(1) } if err = (&servicelabelscontroller.ServicesLabelsReconciler{ Log: ctrl.Log.WithName("controllers").WithName("ServiceLabels"), - }).SetupWithManager(ctx, manager); err != nil { + }).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ServiceLabels") os.Exit(1) } if err = (&servicelabelscontroller.EndpointsLabelsReconciler{ Log: ctrl.Log.WithName("controllers").WithName("EndpointLabels"), - }).SetupWithManager(ctx, manager); err != nil { + }).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "EndpointLabels") os.Exit(1) } @@ -281,12 +282,12 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "EndpointSliceLabels") } - if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager); err != nil { + if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PodLabels") os.Exit(1) } - if err = (&pv.Controller{}).SetupWithManager(manager); err != nil { + if err = (&pv.Controller{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PersistentVolume") os.Exit(1) } @@ -298,12 +299,12 @@ func main() { os.Exit(1) } - if err = (&resources.Global{}).SetupWithManager(manager); err != nil { + if err = (&resources.Global{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "resources.Global") os.Exit(1) } - if err = (&resources.Namespaced{}).SetupWithManager(manager); err != nil { + if err = (&resources.Namespaced{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "resources.Namespaced") os.Exit(1) }