Skip to content

Commit

Permalink
Merge pull request kubernetes#124600 from alvaroaleman/typed-wq
Browse files Browse the repository at this point in the history
Use the generic/typed workqueue throughout
  • Loading branch information
k8s-ci-robot authored May 6, 2024
2 parents 54687f3 + 6d0ac8c commit 1dc30bf
Show file tree
Hide file tree
Showing 94 changed files with 830 additions and 603 deletions.
9 changes: 7 additions & 2 deletions pkg/controller/bootstrap/bootstrapsigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Signer struct {
// have one item (Named <ConfigMapName>) in this queue. We are using it
// serializes and collapses updates as they can come from both the ConfigMap
// and Secrets controllers.
syncQueue workqueue.RateLimitingInterface
syncQueue workqueue.TypedRateLimitingInterface[string]

secretLister corelisters.SecretLister
secretSynced cache.InformerSynced
Expand All @@ -103,7 +103,12 @@ func NewSigner(cl clientset.Interface, secrets informers.SecretInformer, configM
secretSynced: secrets.Informer().HasSynced,
configMapLister: configMaps.Lister(),
configMapSynced: configMaps.Informer().HasSynced,
syncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bootstrap_signer_queue"),
syncQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "bootstrap_signer_queue",
},
),
}

configMaps.Informer().AddEventHandlerWithResyncPeriod(
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/bootstrap/tokencleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type TokenCleaner struct {
// secretSynced returns true if the secret shared informer has been synced at least once.
secretSynced cache.InformerSynced

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// NewTokenCleaner returns a new *NewTokenCleaner.
Expand All @@ -78,7 +78,12 @@ func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInforme
secretLister: secrets.Lister(),
secretSynced: secrets.Informer().HasSynced,
tokenSecretNamespace: options.TokenSecretNamespace,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "token_cleaner",
},
),
}

secrets.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -144,7 +149,7 @@ func (tc *TokenCleaner) processNextWorkItem(ctx context.Context) bool {
}
defer tc.queue.Done(key)

if err := tc.syncFunc(ctx, key.(string)); err != nil {
if err := tc.syncFunc(ctx, key); err != nil {
tc.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
return true
Expand Down
19 changes: 12 additions & 7 deletions pkg/controller/certificates/certificate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type CertificateController struct {

handler func(context.Context, *certificates.CertificateSigningRequest) error

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

func NewCertificateController(
Expand All @@ -63,11 +63,16 @@ func NewCertificateController(
cc := &CertificateController{
name: name,
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "certificate"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedMaxOfRateLimiter[string](
workqueue.NewTypedItemExponentialFailureRateLimiter[string](200*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "certificate",
},
),
handler: handler,
}

Expand Down Expand Up @@ -140,7 +145,7 @@ func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
}
defer cc.queue.Done(cKey)

if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
if err := cc.syncFunc(ctx, cKey); err != nil {
cc.queue.AddRateLimited(cKey)
if _, ignorable := err.(ignorableError); !ignorable {
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/certificates/rootcacertpublisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
e := &Publisher{
client: cl,
rootCA: rootCA,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root_ca_cert_publisher"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "root_ca_cert_publisher",
},
),
}

cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -90,7 +95,7 @@ type Publisher struct {

nsListerSynced cache.InformerSynced

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// Run starts process
Expand Down Expand Up @@ -164,7 +169,7 @@ func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
}
defer c.queue.Done(key)

if err := c.syncHandler(ctx, key.(string)); err != nil {
if err := c.syncHandler(ctx, key); err != nil {
utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.AddRateLimited(key)
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ClusterRoleAggregationController struct {
clusterRolesSynced cache.InformerSynced

syncHandler func(ctx context.Context, key string) error
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
}

// NewClusterRoleAggregation creates a new controller
Expand All @@ -58,7 +58,12 @@ func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInfo
clusterRoleLister: clusterRoleInformer.Lister(),
clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "ClusterRoleAggregator",
},
),
}
c.syncHandler = c.syncClusterRole

Expand Down Expand Up @@ -212,7 +217,7 @@ func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Conte
}
defer c.queue.Done(dsKey)

err := c.syncHandler(ctx, dsKey.(string))
err := c.syncHandler(ctx, dsKey)
if err == nil {
c.queue.Forget(dsKey)
return true
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
// ControllerV2 is a controller for CronJobs.
// Refactored Cronjob controller that uses DelayingQueue and informers
type ControllerV2 struct {
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]

kubeClient clientset.Interface
recorder record.EventRecorder
Expand All @@ -85,7 +85,12 @@ func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInform
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))

jm := &ControllerV2{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "cronjob",
},
),
kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
Expand Down Expand Up @@ -162,10 +167,10 @@ func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
}
defer jm.queue.Done(key)

requeueAfter, err := jm.sync(ctx, key.(string))
requeueAfter, err := jm.sync(ctx, key)
switch {
case err != nil:
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %w", key, err))
jm.queue.AddRateLimited(key)
case requeueAfter != nil:
jm.queue.Forget(key)
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/cronjob/cronjob_controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,12 +1375,12 @@ func TestControllerV2SyncCronJob(t *testing.T) {
}

type fakeQueue struct {
workqueue.RateLimitingInterface
workqueue.TypedRateLimitingInterface[string]
delay time.Duration
key interface{}
}

func (f *fakeQueue) AddAfter(key interface{}, delay time.Duration) {
func (f *fakeQueue) AddAfter(key string, delay time.Duration) {
f.delay = delay
f.key = key
}
Expand Down Expand Up @@ -1593,7 +1593,12 @@ func TestControllerV2UpdateCronJob(t *testing.T) {
return
}
jm.now = justASecondBeforeTheHour
queue := &fakeQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-update-cronjob")}
queue := &fakeQueue{TypedRateLimitingInterface: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "test-update-cronjob",
},
)}
jm.queue = queue
jm.jobControl = &fakeJobControl{}
jm.cronJobControl = &fakeCJControl{}
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type DaemonSetsController struct {
nodeStoreSynced cache.InformerSynced

// DaemonSet keys that need to be synced.
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]

failedPodsBackoff *flowcontrol.Backoff
}
Expand Down Expand Up @@ -153,7 +153,12 @@ func NewDaemonSetsController(
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "daemonset",
},
),
}

daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -315,7 +320,7 @@ func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
}
defer dsc.queue.Done(dsKey)

err := dsc.syncHandler(ctx, dsKey.(string))
err := dsc.syncHandler(ctx, dsKey)
if err == nil {
dsc.queue.Forget(dsKey)
return true
Expand Down
32 changes: 16 additions & 16 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// DeletedFinalStateUnknown should queue the embedded DS if found.
manager.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
enqueuedKey, _ := manager.queue.Get()
if enqueuedKey.(string) != "default/foo" {
if enqueuedKey != "default/foo" {
t.Errorf("expected delete of DeletedFinalStateUnknown to enqueue the daemonset but found: %#v", enqueuedKey)
}
}
Expand Down Expand Up @@ -2890,7 +2890,7 @@ func TestAddNode(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for node %v", node2.Name)
}
}
Expand Down Expand Up @@ -2920,11 +2920,11 @@ func TestAddPod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -2934,11 +2934,11 @@ func TestAddPod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3011,11 +3011,11 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -3027,11 +3027,11 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3189,11 +3189,11 @@ func TestDeletePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
}
expectedKey, _ := controller.KeyFunc(ds1)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

Expand All @@ -3203,11 +3203,11 @@ func TestDeletePod(t *testing.T) {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = manager.queue.Get()
if key == nil || done {
if key == "" || done {
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
}
expectedKey, _ = controller.KeyFunc(ds2)
if got, want := key.(string), expectedKey; got != want {
if got, want := key, expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
Expand Down Expand Up @@ -3255,15 +3255,15 @@ func bumpResourceVersion(obj metav1.Object) {

// getQueuedKeys returns a sorted list of keys in the queue.
// It can be used to quickly check that multiple keys are in there.
func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
func getQueuedKeys(queue workqueue.TypedRateLimitingInterface[string]) []string {
var keys []string
count := queue.Len()
for i := 0; i < count; i++ {
key, done := queue.Get()
if done {
return keys
}
keys = append(keys, key.(string))
keys = append(keys, key)
}
sort.Strings(keys)
return keys
Expand Down
Loading

0 comments on commit 1dc30bf

Please sign in to comment.