Skip to content

Commit

Permalink
feat: add flag --workers
Browse files Browse the repository at this point in the history
Signed-off-by: Oliver Bähler <[email protected]>
  • Loading branch information
oliverbaehler committed Mar 22, 2024
1 parent 3ef5af6 commit 42a81bd
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 17 deletions.
4 changes: 3 additions & 1 deletion controllers/pod/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -123,8 +124,9 @@ func (m *MetadataReconciler) isNamespaceInTenant(ctx context.Context, namespace
return len(tl.Items) > 0
}

func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}, m.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(m)
}
4 changes: 3 additions & 1 deletion controllers/pv/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
log2 "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{}, nil
}

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
func (c *Controller) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
label, err := capsuleutils.GetTypeLabel(&capsulev1beta2.Tenant{})
if err != nil {
return err
Expand All @@ -113,5 +114,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {

return !ok
}))).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(c)
}
4 changes: 3 additions & 1 deletion controllers/rbac/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -32,11 +33,12 @@ type Manager struct {
}

//nolint:revive
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string) (err error) {
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string, workerCount int) (err error) {
namesPredicate := utils.NamesMatchingPredicate(ProvisionerRoleName, DeleterRoleName)

crErr := ctrl.NewControllerManagedBy(mgr).
For(&rbacv1.ClusterRole{}, namesPredicate).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
if crErr != nil {
err = multierror.Append(err, crErr)
Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (r *Global) enqueueRequestFromTenant(ctx context.Context, object client.Obj
return reqs
}

func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
func (r *Global) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
Expand All @@ -72,6 +73,7 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.GlobalTenantResource{}).
Watches(&capsulev1beta2.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRequestFromTenant)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/namespaced.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -26,14 +27,15 @@ type Namespaced struct {
processor Processor
}

func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
func (r *Namespaced) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
}

return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.TenantResource{}).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/servicelabels/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type EndpointsLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type EndpointsLabelsReconciler struct {
Log logr.Logger
}

func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Endpoints{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ct

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
4 changes: 3 additions & 1 deletion controllers/servicelabels/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type ServicesLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type ServicesLabelsReconciler struct {
Log logr.Logger
}

func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Service{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctr

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
4 changes: 3 additions & 1 deletion controllers/tenant/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2"
Expand All @@ -28,14 +29,15 @@ type Manager struct {
RESTConfig *rest.Config
}

func (r *Manager) SetupWithManager(mgr ctrl.Manager) error {
func (r *Manager) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.Tenant{}).
Owns(&corev1.Namespace{}).
Owns(&networkingv1.NetworkPolicy{}).
Owns(&corev1.LimitRange{}).
Owns(&corev1.ResourceQuota{}).
Owns(&rbacv1.RoleBinding{}).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
19 changes: 10 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {

var metricsAddr, namespace, configurationName string

var webhookPort int
var webhookPort, workerCount int

var goFlagSet goflag.FlagSet

Expand All @@ -94,6 +94,7 @@ func main() {
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&version, "version", false, "Print the Capsule version and exit")
flag.StringVar(&configurationName, "configuration-name", "default", "The CapsuleConfiguration resource name to use")
flag.IntVar(&workerCount, "workers", 1, "Defines the number of concurrent reconciles the controller can handle")

opts := zap.Options{
EncoderConfigOptions: append([]zap.EncoderConfigOption{}, func(config *zapcore.EncoderConfig) {
Expand Down Expand Up @@ -194,7 +195,7 @@ func main() {
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
}).SetupWithManager(manager); err != nil {
}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Tenant")
os.Exit(1)
}
Expand Down Expand Up @@ -254,21 +255,21 @@ func main() {
os.Exit(1)
}

if err = rbacManager.SetupWithManager(ctx, manager, configurationName); err != nil {
if err = rbacManager.SetupWithManager(ctx, manager, configurationName, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Rbac")
os.Exit(1)
}

if err = (&servicelabelscontroller.ServicesLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("ServiceLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServiceLabels")
os.Exit(1)
}

if err = (&servicelabelscontroller.EndpointsLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("EndpointLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EndpointLabels")
os.Exit(1)
}
Expand All @@ -281,12 +282,12 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "EndpointSliceLabels")
}

if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager); err != nil {
if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodLabels")
os.Exit(1)
}

if err = (&pv.Controller{}).SetupWithManager(manager); err != nil {
if err = (&pv.Controller{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PersistentVolume")
os.Exit(1)
}
Expand All @@ -298,12 +299,12 @@ func main() {
os.Exit(1)
}

if err = (&resources.Global{}).SetupWithManager(manager); err != nil {
if err = (&resources.Global{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Global")
os.Exit(1)
}

if err = (&resources.Namespaced{}).SetupWithManager(manager); err != nil {
if err = (&resources.Namespaced{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Namespaced")
os.Exit(1)
}
Expand Down

0 comments on commit 42a81bd

Please sign in to comment.