diff --git a/pkg/component/controller/workerconfig/reconciler.go b/pkg/component/controller/workerconfig/reconciler.go index 15553748120c..ac8bdf5757e2 100644 --- a/pkg/component/controller/workerconfig/reconciler.go +++ b/pkg/component/controller/workerconfig/reconciler.go @@ -165,7 +165,7 @@ func (r *Reconciler) Start(context.Context) error { // the update channel and apply those to the desired state. Changes will be // applied whenever the last reconciled state differs from the desired // state. - reconcilerCtx, cancelReconciler := context.WithCancel(context.Background()) + reconcilerCtx, cancelReconciler := context.WithCancelCause(context.Background()) stopped := make(chan struct{}) apply := r.apply go func() { @@ -177,10 +177,10 @@ func (r *Reconciler) Start(context.Context) error { go func() { wait.UntilWithContext(reconcilerCtx, func(ctx context.Context) { - err := r.reconcileAPIServers(ctx, updates, stopped) + err := r.reconcileAPIServers(ctx, updates) // Log any reconciliation errors, but only if they don't // indicate that the reconciler has been stopped concurrently. - if err != nil && !errors.Is(err, reconcilerCtx.Err()) && !errors.Is(err, errStoppedConcurrently) { + if err != nil && !errors.Is(err, errStoppedConcurrently) { r.log.WithError(err).Error("Failed to reconcile API server addresses") } }, 10*time.Second) @@ -189,10 +189,10 @@ func (r *Reconciler) Start(context.Context) error { // lease is acquired. r.leaderElector.AddAcquiredLeaseCallback(func() { go func() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(reconcilerCtx, 1*time.Minute) defer cancel() - err := reconcile(ctx, updates, stopped, func(s *snapshot) { + err := reconcile(ctx, updates, func(s *snapshot) { s.serial++ }) @@ -207,7 +207,7 @@ func (r *Reconciler) Start(context.Context) error { // Store the started state r.apply = nil r.updates = updates - r.requestStop = cancelReconciler + r.requestStop = func() { cancelReconciler(errStoppedConcurrently) } r.stopped = stopped r.state = reconcilerStarted @@ -236,8 +236,8 @@ func (r *Reconciler) runReconcileLoop(ctx context.Context, updates <-chan update var desiredState, reconciledState snapshot runReconciliation := func() error { - if err := ctx.Err(); err != nil { - return fmt.Errorf("%w while processing reconciliation", errStoppedConcurrently) + if err := context.Cause(ctx); err != nil { + return fmt.Errorf("%w while processing reconciliation", err) } if !r.leaderElector.IsLeader() { @@ -322,7 +322,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, cluster *v1beta1.ClusterConf configSnapshot := takeConfigSnapshot(cluster.Spec) - return reconcile(ctx, updates, stopped, func(s *snapshot) { + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + go func() { + select { + case <-stopped: + cancel(errStoppedConcurrently) + case <-ctx.Done(): + } + }() + + return reconcile(ctx, updates, func(s *snapshot) { s.configSnapshot = &configSnapshot }) } @@ -330,25 +341,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, cluster *v1beta1.ClusterConf var errStoppedConcurrently = errors.New("stopped concurrently") // reconcile enqueues the given update and awaits its reconciliation. -func reconcile(ctx context.Context, updates chan<- updateFunc, stopped <-chan struct{}, update func(*snapshot)) error { +func reconcile(ctx context.Context, updates chan<- updateFunc, update func(*snapshot)) error { recoDone := make(chan error, 1) select { case updates <- func(s *snapshot) chan<- error { update(s); return recoDone }: break - case <-stopped: - return fmt.Errorf("%w while trying to enqueue state update", errStoppedConcurrently) case <-ctx.Done(): - return fmt.Errorf("%w while trying to enqueue state update", ctx.Err()) + return fmt.Errorf("%w while trying to enqueue state update", context.Cause(ctx)) } select { case err := <-recoDone: return err - case <-stopped: - return fmt.Errorf("%w while waiting for reconciliation to finish", errStoppedConcurrently) case <-ctx.Done(): - return fmt.Errorf("%w while waiting for reconciliation to finish", ctx.Err()) + return fmt.Errorf("%w while waiting for reconciliation to finish", context.Cause(ctx)) } } @@ -384,7 +391,7 @@ func (r *Reconciler) Stop() error { return nil } -func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- updateFunc, stopped <-chan struct{}) error { +func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- updateFunc) error { client, err := r.clientFactory.GetClient() if err != nil { return err @@ -398,7 +405,7 @@ func (r *Reconciler) reconcileAPIServers(ctx context.Context, updates chan<- upd return false, err } - return false, reconcile(ctx, updates, stopped, func(s *snapshot) { s.apiServers = apiServers }) + return false, reconcile(ctx, updates, func(s *snapshot) { s.apiServers = apiServers }) }) } diff --git a/pkg/component/controller/workerconfig/reconciler_test.go b/pkg/component/controller/workerconfig/reconciler_test.go index c5162eb7373d..8fb65dd4c4a3 100644 --- a/pkg/component/controller/workerconfig/reconciler_test.go +++ b/pkg/component/controller/workerconfig/reconciler_test.go @@ -572,7 +572,7 @@ func TestReconciler_runReconcileLoop(t *testing.T) { leaderElector: &leaderelector.Dummy{Leader: true}, } - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, cancel := context.WithCancelCause(context.TODO()) // Prepare update channel for two updates. updates, firstDone, secondDone := make(chan updateFunc, 2), make(chan error, 1), make(chan error, 1) @@ -581,7 +581,7 @@ func TestReconciler_runReconcileLoop(t *testing.T) { updates <- func(s *snapshot) chan<- error { return firstDone } // Put in the second update that'll cancel the context. - updates <- func(s *snapshot) chan<- error { cancel(); return secondDone } + updates <- func(s *snapshot) chan<- error { cancel(errStoppedConcurrently); return secondDone } underTest.runReconcileLoop(ctx, updates, nil)