Skip to content

Commit

Permalink
feat: add flag --workers
Browse files Browse the repository at this point in the history
Signed-off-by: Oliver Bähler <[email protected]>
  • Loading branch information
oliverbaehler committed May 2, 2024
1 parent e418f74 commit 9b4e980
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 22 deletions.
4 changes: 3 additions & 1 deletion controllers/pod/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -123,8 +124,9 @@ func (m *MetadataReconciler) isNamespaceInTenant(ctx context.Context, namespace
return len(tl.Items) > 0
}

func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}, m.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(m)
}
4 changes: 3 additions & 1 deletion controllers/pv/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
log2 "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{}, nil
}

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
func (c *Controller) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
label, err := capsuleutils.GetTypeLabel(&capsulev1beta2.Tenant{})
if err != nil {
return err
Expand All @@ -113,5 +114,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {

return !ok
}))).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(c)
}
4 changes: 3 additions & 1 deletion controllers/rbac/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -32,11 +33,12 @@ type Manager struct {
}

//nolint:revive
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string) (err error) {
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string, workerCount int) (err error) {
namesPredicate := utils.NamesMatchingPredicate(ProvisionerRoleName, DeleterRoleName)

crErr := ctrl.NewControllerManagedBy(mgr).
For(&rbacv1.ClusterRole{}, namesPredicate).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
if crErr != nil {
err = multierror.Append(err, crErr)
Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (r *Global) enqueueRequestFromTenant(ctx context.Context, object client.Obj
return reqs
}

func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
func (r *Global) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
Expand All @@ -72,6 +73,7 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.GlobalTenantResource{}).
Watches(&capsulev1beta2.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRequestFromTenant)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/namespaced.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -26,14 +27,15 @@ type Namespaced struct {
processor Processor
}

func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
func (r *Namespaced) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
}

return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.TenantResource{}).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/servicelabels/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type EndpointsLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type EndpointsLabelsReconciler struct {
Log logr.Logger
}

func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Endpoints{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ct

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
4 changes: 3 additions & 1 deletion controllers/servicelabels/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type ServicesLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type ServicesLabelsReconciler struct {
Log logr.Logger
}

func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Service{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctr

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
48 changes: 45 additions & 3 deletions controllers/tenant/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,49 @@ package tenant

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/juju/mutex/v2"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2"
)

type Manager struct {
client.Client
Log logr.Logger
Recorder record.EventRecorder
RESTConfig *rest.Config
Log logr.Logger
Recorder record.EventRecorder
RESTConfig *rest.Config
MaxConcurrentReconciles int
clock mutex.Clock
}

func (r *Manager) SetupWithManager(mgr ctrl.Manager) error {
r.clock = clock.RealClock{}

return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.Tenant{}).
Owns(&corev1.Namespace{}).
Owns(&networkingv1.NetworkPolicy{}).
Owns(&corev1.LimitRange{}).
Owns(&corev1.ResourceQuota{}).
Owns(&rbacv1.RoleBinding{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
Complete(r)
}

Expand All @@ -55,6 +67,26 @@ func (r Manager) Reconcile(ctx context.Context, request ctrl.Request) (result ct

return
}

releaser, err := mutex.Acquire(r.mutexSpec(instance))
if err != nil {
switch {
case errors.As(err, &mutex.ErrTimeout):

Check failure on line 74 in controllers/tenant/manager.go

View workflow job for this annotation

GitHub Actions / lint

errorsas: second argument to errors.As should not be *error (govet)
r.Log.Info("acquire timed out, current process is blocked by another reconciliation")

return ctrl.Result{Requeue: true}, nil
case errors.As(err, &mutex.ErrCancelled):

Check failure on line 78 in controllers/tenant/manager.go

View workflow job for this annotation

GitHub Actions / lint

errorsas: second argument to errors.As should not be *error (govet)
r.Log.Info("acquire cancelled")

return ctrl.Result{Requeue: true}, nil
default:
r.Log.Error(err, "acquire failed")

return ctrl.Result{}, err
}
}
defer releaser.Release()

// Ensuring the Tenant Status
if err = r.updateTenantStatus(ctx, instance); err != nil {
r.Log.Error(err, "Cannot update Tenant status")
Expand Down Expand Up @@ -138,6 +170,16 @@ func (r Manager) Reconcile(ctx context.Context, request ctrl.Request) (result ct
return ctrl.Result{}, err
}

func (r *Manager) mutexSpec(obj client.Object) mutex.Spec {
return mutex.Spec{
Name: strings.ReplaceAll(fmt.Sprintf("capsule%s", obj.GetUID()), "-", ""),
Clock: r.clock,
Delay: 2 * time.Millisecond,
Timeout: time.Second,
Cancel: nil,
}
}

func (r *Manager) updateTenantStatus(ctx context.Context, tnt *capsulev1beta2.Tenant) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
if tnt.Spec.Cordoned {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/go-logr/logr v1.4.1
github.com/hashicorp/go-multierror v1.1.1
github.com/juju/mutex/v2 v2.0.0
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -49,6 +50,7 @@ require (
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 h1:EJHbsNpQyupmMeWTq7inn+5L/WZ7JfzCVPJ+DP9McCQ=
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9/go.mod h1:TRm7EVGA3mQOqSVcBySRY7a9Y1/gyVhh/WTCnc5sD4U=
github.com/juju/mutex/v2 v2.0.0 h1:rVmJdOaXGWF8rjcFHBNd4x57/1tks5CgXHx55O55SB0=
github.com/juju/mutex/v2 v2.0.0/go.mod h1:jwCfBs/smYDaeZLqeaCi8CB8M+tOes4yf827HoOEoqk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down
26 changes: 14 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {

var metricsAddr, namespace, configurationName string

var webhookPort int
var webhookPort, workerCount int

var goFlagSet goflag.FlagSet

Expand All @@ -94,6 +94,7 @@ func main() {
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&version, "version", false, "Print the Capsule version and exit")
flag.StringVar(&configurationName, "configuration-name", "default", "The CapsuleConfiguration resource name to use")
flag.IntVar(&workerCount, "workers", 1, "Defines the number of concurrent reconciles the controller can handle")

opts := zap.Options{
EncoderConfigOptions: append([]zap.EncoderConfigOption{}, func(config *zapcore.EncoderConfig) {
Expand Down Expand Up @@ -190,10 +191,11 @@ func main() {
}

if err = (&tenantcontroller.Manager{
RESTConfig: manager.GetConfig(),
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
RESTConfig: manager.GetConfig(),
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
MaxConcurrentReconciles: workerCount,
}).SetupWithManager(manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Tenant")
os.Exit(1)
Expand Down Expand Up @@ -254,21 +256,21 @@ func main() {
os.Exit(1)
}

if err = rbacManager.SetupWithManager(ctx, manager, configurationName); err != nil {
if err = rbacManager.SetupWithManager(ctx, manager, configurationName, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Rbac")
os.Exit(1)
}

if err = (&servicelabelscontroller.ServicesLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("ServiceLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServiceLabels")
os.Exit(1)
}

if err = (&servicelabelscontroller.EndpointsLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("EndpointLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EndpointLabels")
os.Exit(1)
}
Expand All @@ -281,12 +283,12 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "EndpointSliceLabels")
}

if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager); err != nil {
if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodLabels")
os.Exit(1)
}

if err = (&pv.Controller{}).SetupWithManager(manager); err != nil {
if err = (&pv.Controller{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PersistentVolume")
os.Exit(1)
}
Expand All @@ -298,12 +300,12 @@ func main() {
os.Exit(1)
}

if err = (&resources.Global{}).SetupWithManager(manager); err != nil {
if err = (&resources.Global{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Global")
os.Exit(1)
}

if err = (&resources.Namespaced{}).SetupWithManager(manager); err != nil {
if err = (&resources.Namespaced{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Namespaced")
os.Exit(1)
}
Expand Down

0 comments on commit 9b4e980

Please sign in to comment.