Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes the requeueing bug which requeues a reconcile request in a wrong workqueue #259

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
PROJECT_NAME := upjet
PROJECT_REPO := github.com/upbound/$(PROJECT_NAME)

GOLANGCILINT_VERSION ?= 1.53.3
# GOLANGCILINT_VERSION is inherited from build submodule by default.
# Uncomment below if you need to override the version.
# GOLANGCILINT_VERSION ?= 1.54.0
GO_REQUIRED_VERSION ?= 1.20

PLATFORMS ?= linux_amd64 linux_arm64
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
providerHandle: ws.ProviderHandle,
eventHandler: c.eventHandler,
kube: c.kube,
logger: c.logger.WithValues("uid", mg.GetUID()),
logger: c.logger.WithValues("uid", mg.GetUID(), "name", mg.GetName(), "gvk", mg.GetObjectKind().GroupVersionKind().String()),
}, nil
}

Expand Down
30 changes: 27 additions & 3 deletions pkg/controller/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/crossplane/crossplane-runtime/pkg/logging"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -31,16 +32,31 @@ type EventHandler struct {
innerHandler handler.EventHandler
queue workqueue.RateLimitingInterface
rateLimiterMap map[string]workqueue.RateLimiter
logger logging.Logger
mu *sync.RWMutex
}

// Option configures an option for the EventHandler.
type Option func(eventHandler *EventHandler)

// WithLogger configures the logger for the EventHandler.
func WithLogger(logger logging.Logger) Option {
return func(eventHandler *EventHandler) {
eventHandler.logger = logger
}
}

// NewEventHandler initializes a new EventHandler instance.
func NewEventHandler() *EventHandler {
return &EventHandler{
func NewEventHandler(opts ...Option) *EventHandler {
eh := &EventHandler{
innerHandler: &handler.EnqueueRequestForObject{},
mu: &sync.RWMutex{},
rateLimiterMap: make(map[string]workqueue.RateLimiter),
}
for _, o := range opts {
o(eh)
}
return eh
}

// RequestReconcile requeues a reconciliation request for the specified name.
Expand All @@ -51,6 +67,7 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim
if e.queue == nil {
return false
}
logger := e.logger.WithValues("name", name)
item := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
Expand All @@ -62,9 +79,12 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim
e.rateLimiterMap[rateLimiterName] = rateLimiter
}
if failureLimit != nil && rateLimiter.NumRequeues(item) > *failureLimit {
logger.Info("Failure limit has been exceeded.", "failureLimit", *failureLimit, "numRequeues", rateLimiter.NumRequeues(item))
return false
}
e.queue.AddAfter(item, rateLimiter.When(item))
when := rateLimiter.When(item)
e.queue.AddAfter(item, when)
logger.Debug("Reconcile request has been requeued.", "rateLimiterName", rateLimiterName, "when", when)
return true
}

Expand Down Expand Up @@ -94,20 +114,24 @@ func (e *EventHandler) setQueue(limitingInterface workqueue.RateLimitingInterfac

func (e *EventHandler) Create(ctx context.Context, ev event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Create event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Create(ctx, ev, limitingInterface)
}

func (e *EventHandler) Update(ctx context.Context, ev event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Update event.", "name", ev.ObjectOld.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Update(ctx, ev, limitingInterface)
}

func (e *EventHandler) Delete(ctx context.Context, ev event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Delete event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Delete(ctx, ev, limitingInterface)
}

func (e *EventHandler) Generic(ctx context.Context, ev event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) {
e.setQueue(limitingInterface)
e.logger.Debug("Calling the inner handler for Generic event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len())
e.innerHandler.Generic(ctx, ev, limitingInterface)
}
5 changes: 0 additions & 5 deletions pkg/controller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/controller/handler"
"github.com/upbound/upjet/pkg/terraform"
)

Expand Down Expand Up @@ -39,10 +38,6 @@ type Options struct {

// ESSOptions for External Secret Stores.
ESSOptions *ESSOptions

// EventHandler to handle the Kubernetes events and
// to queue reconcile requests.
EventHandler *handler.EventHandler
}

// ESSOptions for External Secret Stores.
Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/upbound/upjet/pkg/controller/handler"
tjcontroller "github.com/upbound/upjet/pkg/controller"
"github.com/upbound/upjet/pkg/terraform"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -35,11 +36,12 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
if o.SecretStoreConfigGVK != nil {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), *o.SecretStoreConfigGVK, connection.WithTLSConfig(o.ESSOptions.TLSConfig)))
}
eventHandler := handler.NewEventHandler(handler.WithLogger(o.Logger.WithValues("gvk", {{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am writing this comment to make sure that what I understand is correct.

As far as I understand, before, we were using a universal event handler for each resource controller that belonged to the provider. With this PR, we allow each resource to use its event handler. This solves the error in the PR description.

Copy link
Collaborator Author

@ulucinar ulucinar Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. Previously, the event handler was unintentionally per API group (an API group was sharing an event handler) or per-provider. We are now fixing this so that an event handler is per GVK.

{{- if .UseAsync }}
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(o.EventHandler))
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(eventHandler))
{{- end}}
opts := []managed.ReconcilerOption{
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(o.EventHandler),
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(eventHandler),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(ac),
{{- end}}
Expand All @@ -63,6 +65,6 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
Named(name).
WithOptions(o.ForControllerRuntime()).
WithEventFilter(xpresource.DesiredStateChanged()).
Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, o.EventHandler).
Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, eventHandler).
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
}
6 changes: 0 additions & 6 deletions pkg/pipeline/templates/setup.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"github.com/upbound/upjet/pkg/controller"
"github.com/upbound/upjet/pkg/controller/handler"

{{ .Imports }}
)

// Setup{{ .Group }} creates all controllers with the supplied logger and adds them to
// the supplied manager.
func Setup{{ .Group }}(mgr ctrl.Manager, o controller.Options) error {
// set the default event handler if the provider's main module did not
// set one.
if o.EventHandler == nil {
o.EventHandler = handler.NewEventHandler()
}
for _, setup := range []func(ctrl.Manager, controller.Options) error{
{{- range $alias := .Aliases }}
{{ $alias }}Setup,
Expand Down