From 5947b435ffea74886d5f0d6751aa2eca61ad0381 Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Wed, 18 Dec 2024 11:51:20 +0100 Subject: [PATCH] refactor: remove b.retryWithRefreshedCluster Signed-off-by: Armando Ruocco --- pkg/management/postgres/backup.go | 61 +++++------- .../postgres/webserver/plugin_backup.go | 46 +++++---- .../replicaclusterswitch/reconciler.go | 4 +- pkg/resources/retry.go | 23 ----- pkg/resources/retry_test.go | 93 ------------------- pkg/resources/status/conditions.go | 26 ++++-- pkg/resources/status/{update.go => patch.go} | 20 ++-- pkg/resources/status/phase.go | 2 +- 8 files changed, 76 insertions(+), 199 deletions(-) delete mode 100644 pkg/resources/retry_test.go rename pkg/resources/status/{update.go => patch.go} (81%) diff --git a/pkg/management/postgres/backup.go b/pkg/management/postgres/backup.go index 6552e73344..b86b08417d 100644 --- a/pkg/management/postgres/backup.go +++ b/pkg/management/postgres/backup.go @@ -32,7 +32,6 @@ import ( "github.com/cloudnative-pg/machinery/pkg/fileutils" "github.com/cloudnative-pg/machinery/pkg/log" pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -151,13 +150,6 @@ func (b *BackupCommand) ensureCompatibility() error { return b.barmanBackup.IsCompatible(postgresVers) } -func (b *BackupCommand) retryWithRefreshedCluster( - ctx context.Context, - cb func() error, -) error { - return resources.RetryWithRefreshedResource(ctx, b.Client, b.Cluster, cb) -} - // run executes the barman-cloud-backup command and updates the status // This method will take long time and is supposed to run inside a dedicated // goroutine. @@ -187,18 +179,16 @@ func (b *BackupCommand) run(ctx context.Context) { } // add backup failed condition to the cluster - if failErr := b.retryWithRefreshedCluster(ctx, func() error { - return status.UpdateAndRefresh( - ctx, - b.Client, - b.Cluster, - func(cluster *apiv1.Cluster) { - meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err)) - cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) - }, - ) - }); failErr != nil { - b.Log.Error(failErr, "while setting cluster condition for failed backup") + if err := status.PatchWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + func(cluster *apiv1.Cluster) { + meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err)) + cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) + }, + ); err != nil { + b.Log.Error(err, "while setting cluster condition for failed backup") // We do not terminate here because it's more important to properly handle // the backup maintenance activity than putting a condition in the cluster } @@ -213,9 +203,12 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error { b.Recorder.Event(b.Backup, "Normal", "Starting", "Backup started") // Update backup status in cluster conditions on startup - if err := b.retryWithRefreshedCluster(ctx, func() error { - return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) - }); err != nil { + if err := status.PatchConditionsWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + apiv1.BackupStartingCondition, + ); err != nil { b.Log.Error(err, "Error changing backup condition (backup started)") // We do not terminate here because we could still have a good backup // even if we are unable to communicate with the Kubernetes API server @@ -259,9 +252,12 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error { } // Update backup status in cluster conditions on backup completion - if err := b.retryWithRefreshedCluster(ctx, func() error { - return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) - }); err != nil { + if err := status.PatchConditionsWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + apiv1.BackupSucceededCondition, + ); err != nil { b.Log.Error(err, "Can't update the cluster with the completed backup data") } @@ -297,21 +293,12 @@ func (b *BackupCommand) backupMaintenance(ctx context.Context) { b.Log.Error(err, "while deleting Backups not present in the catalog") } - if err := b.retryWithRefreshedCluster(ctx, func() error { - origCluster := b.Cluster.DeepCopy() - - // Set the first recoverability point and the last successful backup - b.Cluster.UpdateBackupTimes( + if err := status.PatchWithOptimisticLock(ctx, b.Client, b.Cluster, func(cluster *apiv1.Cluster) { + cluster.UpdateBackupTimes( apiv1.BackupMethod(data.GetBackupMethod()), data.GetFirstRecoverabilityPoint(), data.GetLastSuccessfulBackupTime(), ) - - // TODO(leonardoce) questo deep equal fa venire la nausea - if equality.Semantic.DeepEqual(origCluster.Status, b.Cluster.Status) { - return nil - } - return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster)) }); err != nil { b.Log.Error(err, "while setting the firstRecoverabilityPoint and latestSuccessfulBackup") } diff --git a/pkg/management/postgres/webserver/plugin_backup.go b/pkg/management/postgres/webserver/plugin_backup.go index 5c10140282..9a72c0cab5 100644 --- a/pkg/management/postgres/webserver/plugin_backup.go +++ b/pkg/management/postgres/webserver/plugin_backup.go @@ -33,7 +33,6 @@ import ( "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository" "github.com/cloudnative-pg/cloudnative-pg/internal/configuration" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" - "github.com/cloudnative-pg/cloudnative-pg/pkg/resources" "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" ) @@ -101,9 +100,12 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) { b.Recorder.Event(b.Backup, "Normal", "Starting", "Backup started") // Update backup status in cluster conditions on startup - if err := b.retryWithRefreshedCluster(ctx, func() error { - return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) - }); err != nil { + if err := status.PatchConditionsWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + apiv1.BackupStartingCondition, + ); err != nil { contextLogger.Error(err, "Error changing backup condition (backup started)") // We do not terminate here because we could still have a good backup // even if we are unable to communicate with the Kubernetes API server @@ -151,9 +153,12 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) { } // Update backup status in cluster conditions on backup completion - if err := b.retryWithRefreshedCluster(ctx, func() error { - return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) - }); err != nil { + if err := status.PatchConditionsWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + apiv1.BackupSucceededCondition, + ); err != nil { contextLogger.Error(err, "Can't update the cluster with the completed backup data") } } @@ -175,24 +180,15 @@ func (b *PluginBackupCommand) markBackupAsFailed(ctx context.Context, failure er } // add backup failed condition to the cluster - if failErr := b.retryWithRefreshedCluster(ctx, func() error { - return status.UpdateAndRefresh( - ctx, - b.Client, - b.Cluster, - func(cluster *apiv1.Cluster) { - meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure)) - cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) - }, - ) - }); failErr != nil { + if failErr := status.PatchWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + func(cluster *apiv1.Cluster) { + meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure)) + cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) + }, + ); failErr != nil { contextLogger.Error(failErr, "while setting cluster condition for failed backup") } } - -func (b *PluginBackupCommand) retryWithRefreshedCluster( - ctx context.Context, - cb func() error, -) error { - return resources.RetryWithRefreshedResource(ctx, b.Client, b.Cluster, cb) -} diff --git a/pkg/reconciler/replicaclusterswitch/reconciler.go b/pkg/reconciler/replicaclusterswitch/reconciler.go index 8ad09f38d3..fd185a7e49 100644 --- a/pkg/reconciler/replicaclusterswitch/reconciler.go +++ b/pkg/reconciler/replicaclusterswitch/reconciler.go @@ -90,7 +90,7 @@ func startTransition(ctx context.Context, cli client.Client, cluster *apiv1.Clus return nil, fmt.Errorf("while fencing primary cluster to demote it: %w", err) } - if err := status.UpdateAndRefresh( + if err := status.PatchWithOptimisticLock( ctx, cli, cluster, @@ -139,7 +139,7 @@ func cleanupTransitionMetadata(ctx context.Context, cli client.Client, cluster * } } - return status.UpdateAndRefresh( + return status.PatchWithOptimisticLock( ctx, cli, cluster, diff --git a/pkg/resources/retry.go b/pkg/resources/retry.go index 4a0d13aa1b..bc1e214387 100644 --- a/pkg/resources/retry.go +++ b/pkg/resources/retry.go @@ -16,28 +16,5 @@ limitations under the License. package resources -import ( - "context" - - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/client" -) - // RetryAlways is a function that always returns true on any error encountered func RetryAlways(_ error) bool { return true } - -// RetryWithRefreshedResource updates the resource before invoking the cb -func RetryWithRefreshedResource( - ctx context.Context, - cli client.Client, - resource client.Object, - cb func() error, -) error { - return retry.OnError(retry.DefaultBackoff, RetryAlways, func() error { - if err := cli.Get(ctx, client.ObjectKeyFromObject(resource), resource); err != nil { - return err - } - - return cb() - }) -} diff --git a/pkg/resources/retry_test.go b/pkg/resources/retry_test.go deleted file mode 100644 index 7379c2905c..0000000000 --- a/pkg/resources/retry_test.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright The CloudNativePG Contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - schemeBuilder "github.com/cloudnative-pg/cloudnative-pg/internal/scheme" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("RetryWithRefreshedResource", func() { - const ( - name = "test-deployment" - namespace = "default" - ) - - var ( - fakeClient client.Client - testResource *appsv1.Deployment - ) - - BeforeEach(func() { - fakeClient = fake.NewClientBuilder().WithScheme(schemeBuilder.BuildWithAllKnownScheme()).Build() - testResource = &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, - Spec: appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "test"}, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "test"}}, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "test-container", - Image: "nginx", - }, - }, - }, - }, - }, - } - }) - - Context("when client.Get succeeds", func() { - BeforeEach(func(ctx SpecContext) { - // Set up the fake client to return the resource without error - Expect(fakeClient.Create(ctx, testResource)).To(Succeed()) - - modified := testResource.DeepCopy() - modified.Spec.Replicas = ptr.To(int32(10)) - err := fakeClient.Update(ctx, modified) - Expect(err).ToNot(HaveOccurred()) - }) - - It("should invoke the callback without error and update the resource", func(ctx SpecContext) { - // ensure that the local deployment contains the old value - Expect(*testResource.Spec.Replicas).To(Equal(int32(1))) - - cb := func() error { - return nil - } - - // ensure that now the deployment contains the new value - err := RetryWithRefreshedResource(ctx, fakeClient, testResource, cb) - Expect(err).ToNot(HaveOccurred()) - Expect(*testResource.Spec.Replicas).To(Equal(int32(10))) - }) - }) -}) diff --git a/pkg/resources/status/conditions.go b/pkg/resources/status/conditions.go index 47b4427b5f..b3d4045626 100644 --- a/pkg/resources/status/conditions.go +++ b/pkg/resources/status/conditions.go @@ -43,14 +43,6 @@ func PatchConditionsWithOptimisticLock( return nil } - applyConditions := func(cluster *apiv1.Cluster) bool { - changed := false - for _, c := range conditions { - changed = changed || meta.SetStatusCondition(&cluster.Status.Conditions, c) - } - return changed - } - var currentCluster apiv1.Cluster if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { @@ -70,12 +62,26 @@ func PatchConditionsWithOptimisticLock( return err } - updatedCluster.DeepCopyInto(cluster) - return nil }); err != nil { return fmt.Errorf("while updating conditions: %w", err) } + if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { + return err + } + + cluster.Status.Conditions = currentCluster.Status.Conditions + return nil } + +// applyConditions will apply the passed conditions to the cluster status. +// returns true if there was a change to the existing object status, false otherwise. +func applyConditions(cluster *apiv1.Cluster, conditions ...metav1.Condition) bool { + changed := false + for _, c := range conditions { + changed = changed || meta.SetStatusCondition(&cluster.Status.Conditions, c) + } + return changed +} diff --git a/pkg/resources/status/update.go b/pkg/resources/status/patch.go similarity index 81% rename from pkg/resources/status/update.go rename to pkg/resources/status/patch.go index 469eacc354..53e911b1c3 100644 --- a/pkg/resources/status/update.go +++ b/pkg/resources/status/patch.go @@ -27,28 +27,28 @@ import ( apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" ) -// UpdateAndRefresh updates the status of the cluster using the passed -// transaction function. +// PatchWithOptimisticLock updates the status of the cluster using the passed +// mutator function. // Important: after successfully updating the status, this // function refreshes it into the passed cluster -func UpdateAndRefresh( +func PatchWithOptimisticLock( ctx context.Context, c client.Client, cluster *apiv1.Cluster, - tx func(cluster *apiv1.Cluster), + mutator func(cluster *apiv1.Cluster), ) error { if cluster == nil { return nil } + var currentCluster apiv1.Cluster if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var currentCluster apiv1.Cluster if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { return err } updatedCluster := currentCluster.DeepCopy() - tx(updatedCluster) + mutator(updatedCluster) if equality.Semantic.DeepEqual(currentCluster.Status, updatedCluster.Status) { return nil @@ -63,12 +63,16 @@ func UpdateAndRefresh( return err } - updatedCluster.DeepCopyInto(cluster) - return nil }); err != nil { return fmt.Errorf("while updating conditions: %w", err) } + if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { + return err + } + + currentCluster.Status.DeepCopyInto(&cluster.Status) + return nil } diff --git a/pkg/resources/status/phase.go b/pkg/resources/status/phase.go index 325af71ea0..bac80933c5 100644 --- a/pkg/resources/status/phase.go +++ b/pkg/resources/status/phase.go @@ -51,7 +51,7 @@ func RegisterPhaseWithOrigCluster( phase string, reason string, ) error { - if err := UpdateAndRefresh( + if err := PatchWithOptimisticLock( ctx, cli, modifiedCluster,