Skip to content

Commit

Permalink
Use the generic/typed workqueue throughout
Browse files Browse the repository at this point in the history
This change makes us use the generic workqueue throughout the project in
order to improve type safety and readability of the code.
  • Loading branch information
alvaroaleman committed May 4, 2024
1 parent d387c0c commit 6d0ac8c
Show file tree
Hide file tree
Showing 94 changed files with 830 additions and 603 deletions.
9 changes: 7 additions & 2 deletions pkg/controller/bootstrap/bootstrapsigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Signer struct {
// have one item (Named <ConfigMapName>) in this queue. We are using it
// serializes and collapses updates as they can come from both the ConfigMap
// and Secrets controllers.
syncQueue workqueue.RateLimitingInterface
syncQueue workqueue.TypedRateLimitingInterface[string]

secretLister corelisters.SecretLister
secretSynced cache.InformerSynced
Expand All @@ -103,7 +103,12 @@ func NewSigner(cl clientset.Interface, secrets informers.SecretInformer, configM
secretSynced: secrets.Informer().HasSynced,
configMapLister: configMaps.Lister(),
configMapSynced: configMaps.Informer().HasSynced,
syncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bootstrap_signer_queue"),
syncQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "bootstrap_signer_queue",
},
),
}

configMaps.Informer().AddEventHandlerWithResyncPeriod(
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/bootstrap/tokencleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type TokenCleaner struct {
// secretSynced returns true if the secret shared informer has been synced at least once.
secretSynced cache.InformerSynced

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// NewTokenCleaner returns a new *NewTokenCleaner.
Expand All @@ -78,7 +78,12 @@ func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInforme
secretLister: secrets.Lister(),
secretSynced: secrets.Informer().HasSynced,
tokenSecretNamespace: options.TokenSecretNamespace,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "token_cleaner",
},
),
}

secrets.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -144,7 +149,7 @@ func (tc *TokenCleaner) processNextWorkItem(ctx context.Context) bool {
}
defer tc.queue.Done(key)

if err := tc.syncFunc(ctx, key.(string)); err != nil {
if err := tc.syncFunc(ctx, key); err != nil {
tc.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
return true
Expand Down
19 changes: 12 additions & 7 deletions pkg/controller/certificates/certificate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type CertificateController struct {

handler func(context.Context, *certificates.CertificateSigningRequest) error

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

func NewCertificateController(
Expand All @@ -63,11 +63,16 @@ func NewCertificateController(
cc := &CertificateController{
name: name,
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "certificate"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedMaxOfRateLimiter[string](
workqueue.NewTypedItemExponentialFailureRateLimiter[string](200*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "certificate",
},
),
handler: handler,
}

Expand Down Expand Up @@ -140,7 +145,7 @@ func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
}
defer cc.queue.Done(cKey)

if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
if err := cc.syncFunc(ctx, cKey); err != nil {
cc.queue.AddRateLimited(cKey)
if _, ignorable := err.(ignorableError); !ignorable {
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/certificates/rootcacertpublisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
e := &Publisher{
client: cl,
rootCA: rootCA,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root_ca_cert_publisher"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "root_ca_cert_publisher",
},
),
}

cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -90,7 +95,7 @@ type Publisher struct {

nsListerSynced cache.InformerSynced

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// Run starts process
Expand Down Expand Up @@ -164,7 +169,7 @@ func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
}
defer c.queue.Done(key)

if err := c.syncHandler(ctx, key.(string)); err != nil {
if err := c.syncHandler(ctx, key); err != nil {
utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.AddRateLimited(key)
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ClusterRoleAggregationController struct {
clusterRolesSynced cache.InformerSynced

syncHandler func(ctx context.Context, key string) error
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// NewClusterRoleAggregation creates a new controller
Expand All @@ -58,7 +58,12 @@ func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInfo
clusterRoleLister: clusterRoleInformer.Lister(),
clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "ClusterRoleAggregator",
},
),
}
c.syncHandler = c.syncClusterRole

Expand Down Expand Up @@ -212,7 +217,7 @@ func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Conte
}
defer c.queue.Done(dsKey)

err := c.syncHandler(ctx, dsKey.(string))
err := c.syncHandler(ctx, dsKey)
if err == nil {
c.queue.Forget(dsKey)
return true
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
// ControllerV2 is a controller for CronJobs.
// Refactored Cronjob controller that uses DelayingQueue and informers
type ControllerV2 struct {
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]

kubeClient clientset.Interface
recorder record.EventRecorder
Expand All @@ -85,7 +85,12 @@ func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInform
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))

jm := &ControllerV2{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "cronjob",
},
),
kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
Expand Down Expand Up @@ -162,10 +167,10 @@ func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
}
defer jm.queue.Done(key)

requeueAfter, err := jm.sync(ctx, key.(string))
requeueAfter, err := jm.sync(ctx, key)
switch {
case err != nil:
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %w", key, err))
jm.queue.AddRateLimited(key)
case requeueAfter != nil:
jm.queue.Forget(key)
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/cronjob/cronjob_controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,12 +1375,12 @@ func TestControllerV2SyncCronJob(t *testing.T) {
}

type fakeQueue struct {
workqueue.RateLimitingInterface
workqueue.TypedRateLimitingInterface[string]
delay time.Duration
key interface{}
}

func (f *fakeQueue) AddAfter(key interface{}, delay time.Duration) {
func (f *fakeQueue) AddAfter(key string, delay time.Duration) {
f.delay = delay
f.key = key
}
Expand Down Expand Up @@ -1593,7 +1593,12 @@ func TestControllerV2UpdateCronJob(t *testing.T) {
return
}
jm.now = justASecondBeforeTheHour
queue := &fakeQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob")}
queue := &fakeQueue{TypedRateLimitingInterface: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "test-update-cronjob",
},
)}
jm.queue = queue
jm.jobControl = &fakeJobControl{}
jm.cronJobControl = &fakeCJControl{}
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type DaemonSetsController struct {
nodeStoreSynced cache.InformerSynced

// DaemonSet keys that need to be synced.
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]

failedPodsBackoff *flowcontrol.Backoff
}
Expand Down Expand Up @@ -153,7 +153,12 @@ func NewDaemonSetsController(
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "daemonset",
},
),
}

daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -315,7 +320,7 @@ func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
}
defer dsc.queue.Done(dsKey)

err := dsc.syncHandler(ctx, dsKey.(string))
err := dsc.syncHandler(ctx, dsKey)
if err == nil {
dsc.queue.Forget(dsKey)
return true
Expand Down
32 changes: 16 additions & 16 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// DeletedFinalStateUnknown should queue the embedded DS if found.
manager.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
enqueuedKey, _ := manager.queue.Get()
if enqueuedKey.(string) != "default/foo" {
if enqueuedKey != "default/foo" {
t.Errorf("expected delete of DeletedFinalStateUnknown to enqueue the daemonset but found: %#v", enqueuedKey)
}
}
Expand Down Expand Up @@ -2890,7 +2890,7 @@ func TestAddNode(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for node %v", node2.Name)
}
}
Expand Down Expand Up @@ -2920,11 +2920,11 @@ func TestAddPod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -2934,11 +2934,11 @@ func TestAddPod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3011,11 +3011,11 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -3027,11 +3027,11 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3189,11 +3189,11 @@ func TestDeletePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -3203,11 +3203,11 @@ func TestDeletePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3255,15 +3255,15 @@ func bumpResourceVersion(obj metav1.Object) {

// getQueuedKeys returns a sorted list of keys in the queue.
// It can be used to quickly check that multiple keys are in there.
func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
func getQueuedKeys(queue workqueue.TypedRateLimitingInterface[string]) []string {
var keys []string
count := queue.Len()
for i := 0; i < count; i++ {
key, done := queue.Get()
if done {
return keys
}
keys = append(keys, key.(string))
keys = append(keys, key)
}
sort.Strings(keys)
return keys
Expand Down
Loading

0 comments on commit 6d0ac8c

Please sign in to comment.