Skip to content

Commit

Permalink
Fixes the requeueing bug which requeues a reconcile request in a wron…
Browse files Browse the repository at this point in the history
…g workqueue

Signed-off-by: Alper Rifat Ulucinar <[email protected]>
  • Loading branch information
ulucinar committed Aug 18, 2023
1 parent 6e0d116 commit 4cfa964
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 19 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
PROJECT_NAME := upjet
PROJECT_REPO := github.com/upbound/$(PROJECT_NAME)

GOLANGCILINT_VERSION ?= 1.53.3
# GOLANGCILINT_VERSION is inherited from build submodule by default.
# Uncomment below if you need to override the version.
# GOLANGCILINT_VERSION ?= 1.54.0
GO_REQUIRED_VERSION ?= 1.20

PLATFORMS ?= linux_amd64 linux_arm64
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
providerHandle: ws.ProviderHandle,
eventHandler: c.eventHandler,
kube: c.kube,
logger: c.logger.WithValues("uid", mg.GetUID()),
logger: c.logger.WithValues("uid", mg.GetUID(), "name", mg.GetName(), "gvk", mg.GetObjectKind().GroupVersionKind().String()),
}, nil
}

Expand Down
30 changes: 27 additions & 3 deletions pkg/controller/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/crossplane/crossplane-runtime/pkg/logging"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -31,16 +32,31 @@ type EventHandler struct {
innerHandler handler.EventHandler
queue workqueue.RateLimitingInterface
rateLimiterMap map[string]workqueue.RateLimiter
logger logging.Logger
mu *sync.RWMutex
}

// Option configures an option for the EventHandler.
type Option func(eventHandler *EventHandler)

// WithLogger configures the logger for the EventHandler.
func WithLogger(logger logging.Logger) Option {
return func(eventHandler *EventHandler) {
eventHandler.logger = logger
}
}

// NewEventHandler initializes a new EventHandler instance.
func NewEventHandler() *EventHandler {
return &EventHandler{
func NewEventHandler(opts ...Option) *EventHandler {
eh := &EventHandler{
innerHandler: &handler.EnqueueRequestForObject{},
mu: &sync.RWMutex{},
rateLimiterMap: make(map[string]workqueue.RateLimiter),
}
for _, o := range opts {
o(eh)
}
return eh
}

// RequestReconcile requeues a reconciliation request for the specified name.
Expand All @@ -51,6 +67,7 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim
if e.queue == nil {
return false
}
logger := e.logger.WithValues("name", name)
item := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
Expand All @@ -62,9 +79,12 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim
e.rateLimiterMap[rateLimiterName] = rateLimiter
}
if failureLimit != nil && rateLimiter.NumRequeues(item) > *failureLimit {
logger.Info("Failure limit has been exceeded.", "failureLimit", *failureLimit, "numRequeues", rateLimiter.NumRequeues(item))
return false
}
e.queue.AddAfter(item, rateLimiter.When(item))
when := rateLimiter.When(item)
e.queue.AddAfter(item, when)
logger.Debug("Reconcile request has been requeued.", "rateLimiterName", rateLimiterName, "when", when)
return true
}

Expand Down Expand Up @@ -94,20 +114,24 @@ func (e *EventHandler) setQueue(limitingInterface workqueue.RateLimitingInterfac

func (e *EventHandler) Create(ctx context.Context, ev event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Create event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Create(ctx, ev, limitingInterface)
}

func (e *EventHandler) Update(ctx context.Context, ev event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Update event.", "name", ev.ObjectOld.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Update(ctx, ev, limitingInterface)
}

func (e *EventHandler) Delete(ctx context.Context, ev event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Delete event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Delete(ctx, ev, limitingInterface)
}

func (e *EventHandler) Generic(ctx context.Context, ev event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Generic event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Generic(ctx, ev, limitingInterface)
}
5 changes: 0 additions & 5 deletions pkg/controller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/controller/handler"
"github.com/upbound/upjet/pkg/terraform"
)

Expand Down Expand Up @@ -39,10 +38,6 @@ type Options struct {

// ESSOptions for External Secret Stores.
ESSOptions *ESSOptions

// EventHandler to handle the Kubernetes events and
// to queue reconcile requests.
EventHandler *handler.EventHandler
}

// ESSOptions for External Secret Stores.
Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/upbound/upjet/pkg/controller/handler"
tjcontroller "github.com/upbound/upjet/pkg/controller"
"github.com/upbound/upjet/pkg/terraform"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -35,11 +36,12 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
if o.SecretStoreConfigGVK != nil {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), *o.SecretStoreConfigGVK, connection.WithTLSConfig(o.ESSOptions.TLSConfig)))
}
eventHandler := handler.NewEventHandler(handler.WithLogger(o.Logger.WithValues("gvk", {{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind)))
{{- if .UseAsync }}
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(o.EventHandler))
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(eventHandler))
{{- end}}
opts := []managed.ReconcilerOption{
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(o.EventHandler),
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(eventHandler),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(ac),
{{- end}}
Expand All @@ -63,6 +65,6 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
Named(name).
WithOptions(o.ForControllerRuntime()).
WithEventFilter(xpresource.DesiredStateChanged()).
Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, o.EventHandler).
Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, eventHandler).
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
}
6 changes: 0 additions & 6 deletions pkg/pipeline/templates/setup.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"github.com/upbound/upjet/pkg/controller"
"github.com/upbound/upjet/pkg/controller/handler"

{{ .Imports }}
)

// Setup{{ .Group }} creates all controllers with the supplied logger and adds them to
// the supplied manager.
func Setup{{ .Group }}(mgr ctrl.Manager, o controller.Options) error {
// set the default event handler if the provider's main module did not
// set one.
if o.EventHandler == nil {
o.EventHandler = handler.NewEventHandler()
}
for _, setup := range []func(ctrl.Manager, controller.Options) error{
{{- range $alias := .Aliases }}
{{ $alias }}Setup,
Expand Down

0 comments on commit 4cfa964

Please sign in to comment.