From 67cc5473f3ca30f47d12860a01d57d29b2c47039 Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Wed, 18 Dec 2024 17:47:20 +0100 Subject: [PATCH] fix: use optimistic locking when updating conditions (#6328) Kubernetes does not implement strategic merge for CRDs, and every JSON merge patch will replace the condition set with a new one. It is possible for a proposed patch to start from a Cluster that is not up-to-date, and in that case, the conditions will be reverted back to an older status. This patch fixes this race condition by encapsulating this operator with a merge patch that requires optimistic locking and retrying the conditions update when needed. Fixes: #6317 Signed-off-by: Leonardo Cecchi Signed-off-by: Armando Ruocco Signed-off-by: Marco Nenciarini Co-authored-by: Armando Ruocco Co-authored-by: Marco Nenciarini --- api/v1/cluster_conditions.go | 8 +- internal/controller/backup_controller.go | 30 +++++-- pkg/conditions/conditions.go | 49 ----------- pkg/conditions/doc.go | 19 ----- pkg/management/postgres/backup.go | 24 +++--- pkg/management/postgres/backup_test.go | 1 + pkg/management/postgres/webserver/local.go | 15 ++-- .../postgres/webserver/plugin_backup.go | 21 ++--- .../replicaclusterswitch/reconciler.go | 79 ++++++++++-------- pkg/resources/status/conditions.go | 81 +++++++++++++++++++ pkg/resources/status/phase.go | 77 +++++++++--------- pkg/resources/status/update.go | 73 +++++++++++++++++ 12 files changed, 305 insertions(+), 172 deletions(-) delete mode 100644 pkg/conditions/conditions.go delete mode 100644 pkg/conditions/doc.go create mode 100644 pkg/resources/status/conditions.go create mode 100644 pkg/resources/status/update.go diff --git a/api/v1/cluster_conditions.go b/api/v1/cluster_conditions.go index 9d1e83947a..ae9844632d 100644 --- a/api/v1/cluster_conditions.go +++ b/api/v1/cluster_conditions.go @@ -22,7 +22,7 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" var ( // BackupSucceededCondition is added to a backup // when it was completed correctly - BackupSucceededCondition = &metav1.Condition{ + BackupSucceededCondition = metav1.Condition{ Type: string(ConditionBackup), Status: metav1.ConditionTrue, Reason: string(ConditionReasonLastBackupSucceeded), @@ -31,7 +31,7 @@ var ( // BackupStartingCondition is added to a backup // when it started - BackupStartingCondition = &metav1.Condition{ + BackupStartingCondition = metav1.Condition{ Type: string(ConditionBackup), Status: metav1.ConditionFalse, Reason: string(ConditionBackupStarted), @@ -40,8 +40,8 @@ var ( // BuildClusterBackupFailedCondition builds // ConditionReasonLastBackupFailed condition - BuildClusterBackupFailedCondition = func(err error) *metav1.Condition { - return &metav1.Condition{ + BuildClusterBackupFailedCondition = func(err error) metav1.Condition { + return metav1.Condition{ Type: string(ConditionBackup), Status: metav1.ConditionFalse, Reason: string(ConditionReasonLastBackupFailed), diff --git a/internal/controller/backup_controller.go b/internal/controller/backup_controller.go index 4506c08737..29c6aea6f9 100644 --- a/internal/controller/backup_controller.go +++ b/internal/controller/backup_controller.go @@ -45,11 +45,11 @@ import ( cnpgiClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client" "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository" "github.com/cloudnative-pg/cloudnative-pg/pkg/certs" - "github.com/cloudnative-pg/cloudnative-pg/pkg/conditions" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres/webserver/client/remote" "github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/backup/volumesnapshot" "github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/persistentvolumeclaim" + resourcestatus "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" "github.com/cloudnative-pg/cloudnative-pg/pkg/specs" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" ) @@ -417,7 +417,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup( } } - if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupStartingCondition); errCond != nil { + if errCond := resourcestatus.PatchConditionsWithOptimisticLock( + ctx, + r.Client, + cluster, + apiv1.BackupStartingCondition, + ); errCond != nil { contextLogger.Error(errCond, "Error while updating backup condition (backup starting)") } @@ -440,7 +445,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup( // and un-fence the Pod contextLogger.Error(err, "while executing snapshot backup") // Update backup status in cluster conditions - if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil { + if errCond := resourcestatus.PatchConditionsWithOptimisticLock( + ctx, + r.Client, + cluster, + apiv1.BuildClusterBackupFailedCondition(err), + ); errCond != nil { contextLogger.Error(errCond, "Error while updating backup condition (backup snapshot failed)") } @@ -453,7 +463,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup( return res, nil } - if err := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupSucceededCondition); err != nil { + if err := resourcestatus.PatchConditionsWithOptimisticLock( + ctx, + r.Client, + cluster, + apiv1.BackupSucceededCondition, + ); err != nil { contextLogger.Error(err, "Can't update the cluster with the completed snapshot backup data") } @@ -633,7 +648,12 @@ func startInstanceManagerBackup( status.CommandError = stdout // Update backup status in cluster conditions - if errCond := conditions.Patch(ctx, client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil { + if errCond := resourcestatus.PatchConditionsWithOptimisticLock( + ctx, + client, + cluster, + apiv1.BuildClusterBackupFailedCondition(err), + ); errCond != nil { log.FromContext(ctx).Error(errCond, "Error while updating backup condition (backup failed)") } return postgres.PatchBackupStatusAndRetry(ctx, client, backup) diff --git a/pkg/conditions/conditions.go b/pkg/conditions/conditions.go deleted file mode 100644 index 768ac02d49..0000000000 --- a/pkg/conditions/conditions.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright The CloudNativePG Contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package conditions - -import ( - "context" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" -) - -// Patch will patch a particular condition in cluster status. -func Patch( - ctx context.Context, - c client.Client, - cluster *apiv1.Cluster, - condition *metav1.Condition, -) error { - if cluster == nil || condition == nil { - return nil - } - - existingCluster := cluster.DeepCopy() - if changed := meta.SetStatusCondition(&cluster.Status.Conditions, *condition); changed { - // To avoid conflict using patch instead of update - if err := c.Status().Patch(ctx, cluster, client.MergeFrom(existingCluster)); err != nil { - return err - } - } - - return nil -} diff --git a/pkg/conditions/doc.go b/pkg/conditions/doc.go deleted file mode 100644 index acecc6fc10..0000000000 --- a/pkg/conditions/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright The CloudNativePG Contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package conditions contains functions useful to update the conditions -// on the resources managed by the operator -package conditions diff --git a/pkg/management/postgres/backup.go b/pkg/management/postgres/backup.go index 46ff344b29..f79c4d4a9a 100644 --- a/pkg/management/postgres/backup.go +++ b/pkg/management/postgres/backup.go @@ -32,6 +32,7 @@ import ( "github.com/cloudnative-pg/machinery/pkg/fileutils" "github.com/cloudnative-pg/machinery/pkg/log" pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -41,9 +42,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" - "github.com/cloudnative-pg/cloudnative-pg/pkg/conditions" "github.com/cloudnative-pg/cloudnative-pg/pkg/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/resources" + "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" // this is needed to correctly open the sql connection with the pgx driver _ "github.com/jackc/pgx/v5/stdlib" @@ -187,12 +188,15 @@ func (b *BackupCommand) run(ctx context.Context) { // add backup failed condition to the cluster if failErr := b.retryWithRefreshedCluster(ctx, func() error { - origCluster := b.Cluster.DeepCopy() - - meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(err)) - - b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) - return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster)) + return status.PatchWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + func(cluster *apiv1.Cluster) { + meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err)) + cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) + }, + ) }); failErr != nil { b.Log.Error(failErr, "while setting cluster condition for failed backup") // We do not terminate here because it's more important to properly handle @@ -210,7 +214,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error { // Update backup status in cluster conditions on startup if err := b.retryWithRefreshedCluster(ctx, func() error { - return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) + return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) }); err != nil { b.Log.Error(err, "Error changing backup condition (backup started)") // We do not terminate here because we could still have a good backup @@ -256,7 +260,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error { // Update backup status in cluster conditions on backup completion if err := b.retryWithRefreshedCluster(ctx, func() error { - return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) + return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) }); err != nil { b.Log.Error(err, "Can't update the cluster with the completed backup data") } @@ -303,7 +307,7 @@ func (b *BackupCommand) backupMaintenance(ctx context.Context) { data.GetLastSuccessfulBackupTime(), ) - if reflect.DeepEqual(origCluster.Status, b.Cluster.Status) { + if equality.Semantic.DeepEqual(origCluster.Status, b.Cluster.Status) { return nil } return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster)) diff --git a/pkg/management/postgres/backup_test.go b/pkg/management/postgres/backup_test.go index 18307bc791..8ff4796c72 100644 --- a/pkg/management/postgres/backup_test.go +++ b/pkg/management/postgres/backup_test.go @@ -125,6 +125,7 @@ var _ = Describe("testing backup command", func() { Client: fake.NewClientBuilder(). WithScheme(scheme.BuildWithAllKnownScheme()). WithObjects(cluster, backup). + WithStatusSubresource(cluster, backup). Build(), Recorder: &record.FakeRecorder{}, Env: os.Environ(), diff --git a/pkg/management/postgres/webserver/local.go b/pkg/management/postgres/webserver/local.go index 0d15f851ca..27b15db9e1 100644 --- a/pkg/management/postgres/webserver/local.go +++ b/pkg/management/postgres/webserver/local.go @@ -33,9 +33,9 @@ import ( apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cloudnative-pg/internal/management/cache" - "github.com/cloudnative-pg/cloudnative-pg/pkg/conditions" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/url" + "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" ) @@ -245,9 +245,9 @@ type ArchiveStatusRequest struct { Error string `json:"error,omitempty"` } -func (asr *ArchiveStatusRequest) getContinuousArchivingCondition() *metav1.Condition { +func (asr *ArchiveStatusRequest) getContinuousArchivingCondition() metav1.Condition { if asr.Error != "" { - return &metav1.Condition{ + return metav1.Condition{ Type: string(apiv1.ConditionContinuousArchiving), Status: metav1.ConditionFalse, Reason: string(apiv1.ConditionReasonContinuousArchivingFailing), @@ -255,7 +255,7 @@ func (asr *ArchiveStatusRequest) getContinuousArchivingCondition() *metav1.Condi } } - return &metav1.Condition{ + return metav1.Condition{ Type: string(apiv1.ConditionContinuousArchiving), Status: metav1.ConditionTrue, Reason: string(apiv1.ConditionReasonContinuousArchivingSuccess), @@ -283,7 +283,12 @@ func (ws *localWebserverEndpoints) setWALArchiveStatusCondition(w http.ResponseW return } - if errCond := conditions.Patch(ctx, ws.typedClient, cluster, asr.getContinuousArchivingCondition()); errCond != nil { + if errCond := status.PatchConditionsWithOptimisticLock( + ctx, + ws.typedClient, + cluster, + asr.getContinuousArchivingCondition(), + ); errCond != nil { contextLogger.Error(errCond, "Error changing wal archiving condition", "condition", asr.getContinuousArchivingCondition()) http.Error( diff --git a/pkg/management/postgres/webserver/plugin_backup.go b/pkg/management/postgres/webserver/plugin_backup.go index 5d1ad1562b..2e6f58f5b6 100644 --- a/pkg/management/postgres/webserver/plugin_backup.go +++ b/pkg/management/postgres/webserver/plugin_backup.go @@ -32,9 +32,9 @@ import ( pluginClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client" "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository" "github.com/cloudnative-pg/cloudnative-pg/internal/configuration" - "github.com/cloudnative-pg/cloudnative-pg/pkg/conditions" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" "github.com/cloudnative-pg/cloudnative-pg/pkg/resources" + "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" ) // PluginBackupCommand represent a backup command that is being executed @@ -102,7 +102,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) { // Update backup status in cluster conditions on startup if err := b.retryWithRefreshedCluster(ctx, func() error { - return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) + return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition) }); err != nil { contextLogger.Error(err, "Error changing backup condition (backup started)") // We do not terminate here because we could still have a good backup @@ -152,7 +152,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) { // Update backup status in cluster conditions on backup completion if err := b.retryWithRefreshedCluster(ctx, func() error { - return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) + return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition) }); err != nil { contextLogger.Error(err, "Can't update the cluster with the completed backup data") } @@ -176,12 +176,15 @@ func (b *PluginBackupCommand) markBackupAsFailed(ctx context.Context, failure er // add backup failed condition to the cluster if failErr := b.retryWithRefreshedCluster(ctx, func() error { - origCluster := b.Cluster.DeepCopy() - - meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(failure)) - - b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) - return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster)) + return status.PatchWithOptimisticLock( + ctx, + b.Client, + b.Cluster, + func(cluster *apiv1.Cluster) { + meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure)) + cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339) + }, + ) }); failErr != nil { contextLogger.Error(failErr, "while setting cluster condition for failed backup") } diff --git a/pkg/reconciler/replicaclusterswitch/reconciler.go b/pkg/reconciler/replicaclusterswitch/reconciler.go index 15342e9adc..fd185a7e49 100644 --- a/pkg/reconciler/replicaclusterswitch/reconciler.go +++ b/pkg/reconciler/replicaclusterswitch/reconciler.go @@ -31,6 +31,7 @@ import ( apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres/webserver/client/remote" "github.com/cloudnative-pg/cloudnative-pg/pkg/postgres" + "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status" "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" ) @@ -89,28 +90,33 @@ func startTransition(ctx context.Context, cli client.Client, cluster *apiv1.Clus return nil, fmt.Errorf("while fencing primary cluster to demote it: %w", err) } - origCluster := cluster.DeepCopy() - meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ - Type: conditionDesignatedPrimaryTransition, - Status: metav1.ConditionFalse, - Reason: "ReplicaClusterAfterCreation", - Message: "Enabled external cluster after a node was generated", - }) - meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ - Type: conditionFence, - Status: metav1.ConditionTrue, - Reason: "ReplicaClusterAfterCreation", - Message: "Enabled external cluster after a node was generated", - }) - meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ - Type: ConditionReplicaClusterSwitch, - Status: metav1.ConditionFalse, - Reason: "ReplicaEnabledSetTrue", - Message: "Starting the Replica cluster transition", - }) - - cluster.Status.SwitchReplicaClusterStatus.InProgress = true - if err := cli.Status().Patch(ctx, cluster, client.MergeFrom(origCluster)); err != nil { + if err := status.PatchWithOptimisticLock( + ctx, + cli, + cluster, + func(cluster *apiv1.Cluster) { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: conditionDesignatedPrimaryTransition, + Status: metav1.ConditionFalse, + Reason: "ReplicaClusterAfterCreation", + Message: "Enabled external cluster after a node was generated", + }) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: conditionFence, + Status: metav1.ConditionTrue, + Reason: "ReplicaClusterAfterCreation", + Message: "Enabled external cluster after a node was generated", + }) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: ConditionReplicaClusterSwitch, + Status: metav1.ConditionFalse, + Reason: "ReplicaEnabledSetTrue", + Message: "Starting the Replica cluster transition", + }) + + cluster.Status.SwitchReplicaClusterStatus.InProgress = true + }, + ); err != nil { return nil, err } @@ -132,18 +138,23 @@ func cleanupTransitionMetadata(ctx context.Context, cli client.Client, cluster * return err } } - origCluster := cluster.DeepCopy() - meta.RemoveStatusCondition(&cluster.Status.Conditions, conditionDesignatedPrimaryTransition) - meta.RemoveStatusCondition(&cluster.Status.Conditions, conditionFence) - meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ - Type: ConditionReplicaClusterSwitch, - Status: metav1.ConditionTrue, - Reason: "ReplicaEnabledSetTrue", - Message: "Completed the Replica cluster transition", - }) - cluster.Status.SwitchReplicaClusterStatus.InProgress = false - - return cli.Status().Patch(ctx, cluster, client.MergeFrom(origCluster)) + + return status.PatchWithOptimisticLock( + ctx, + cli, + cluster, + func(cluster *apiv1.Cluster) { + meta.RemoveStatusCondition(&cluster.Status.Conditions, conditionDesignatedPrimaryTransition) + meta.RemoveStatusCondition(&cluster.Status.Conditions, conditionFence) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: ConditionReplicaClusterSwitch, + Status: metav1.ConditionTrue, + Reason: "ReplicaEnabledSetTrue", + Message: "Completed the Replica cluster transition", + }) + cluster.Status.SwitchReplicaClusterStatus.InProgress = false + }, + ) } func reconcileDemotionToken( diff --git a/pkg/resources/status/conditions.go b/pkg/resources/status/conditions.go new file mode 100644 index 0000000000..54b09a056b --- /dev/null +++ b/pkg/resources/status/conditions.go @@ -0,0 +1,81 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" +) + +// PatchConditionsWithOptimisticLock will update a particular condition in cluster status. +// This function may update the conditions in the passed cluster +// with the latest ones that were found from the API server. +// This function is needed because Kubernetes still doesn't support strategic merge +// for CRDs (see https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/). +func PatchConditionsWithOptimisticLock( + ctx context.Context, + c client.Client, + cluster *apiv1.Cluster, + conditions ...metav1.Condition, +) error { + if cluster == nil || len(conditions) == 0 { + return nil + } + + applyConditions := func(cluster *apiv1.Cluster) bool { + changed := false + for _, c := range conditions { + changed = changed || meta.SetStatusCondition(&cluster.Status.Conditions, c) + } + return changed + } + + var currentCluster apiv1.Cluster + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { + return err + } + + updatedCluster := currentCluster.DeepCopy() + if changed := applyConditions(updatedCluster); !changed { + return nil + } + + if err := c.Status().Patch( + ctx, + updatedCluster, + client.MergeFromWithOptions(¤tCluster, client.MergeFromWithOptimisticLock{}), + ); err != nil { + return err + } + + cluster.Status.Conditions = updatedCluster.Status.Conditions + + return nil + }); err != nil { + return fmt.Errorf("while updating conditions: %w", err) + } + + return nil +} diff --git a/pkg/resources/status/phase.go b/pkg/resources/status/phase.go index 684eefd26c..bac80933c5 100644 --- a/pkg/resources/status/phase.go +++ b/pkg/resources/status/phase.go @@ -18,7 +18,7 @@ package status import ( "context" - "reflect" + "fmt" "github.com/cloudnative-pg/machinery/pkg/log" "k8s.io/apimachinery/pkg/api/meta" @@ -51,47 +51,50 @@ func RegisterPhaseWithOrigCluster( phase string, reason string, ) error { - contextLogger := log.FromContext(ctx) - - // we ensure that the modifiedCluster conditions aren't nil before operating - if modifiedCluster.Status.Conditions == nil { - modifiedCluster.Status.Conditions = []metav1.Condition{} + if err := PatchWithOptimisticLock( + ctx, + cli, + modifiedCluster, + func(cluster *apiv1.Cluster) { + if cluster.Status.Conditions == nil { + cluster.Status.Conditions = []metav1.Condition{} + } + + cluster.Status.Phase = phase + cluster.Status.PhaseReason = reason + + condition := metav1.Condition{ + Type: string(apiv1.ConditionClusterReady), + Status: metav1.ConditionFalse, + Reason: string(apiv1.ClusterIsNotReady), + Message: "Cluster Is Not Ready", + } + + if cluster.Status.Phase == apiv1.PhaseHealthy { + condition = metav1.Condition{ + Type: string(apiv1.ConditionClusterReady), + Status: metav1.ConditionTrue, + Reason: string(apiv1.ClusterReady), + Message: "Cluster is Ready", + } + } + + meta.SetStatusCondition(&cluster.Status.Conditions, condition) + }, + ); err != nil { + return fmt.Errorf("while updating phase: %w", err) } - modifiedCluster.Status.Phase = phase - modifiedCluster.Status.PhaseReason = reason + contextLogger := log.FromContext(ctx) - condition := metav1.Condition{ - Type: string(apiv1.ConditionClusterReady), - Status: metav1.ConditionFalse, - Reason: string(apiv1.ClusterIsNotReady), - Message: "Cluster Is Not Ready", - } + modifiedPhase := modifiedCluster.Status.Phase + origPhase := origCluster.Status.Phase - if modifiedCluster.Status.Phase == apiv1.PhaseHealthy { - condition = metav1.Condition{ - Type: string(apiv1.ConditionClusterReady), - Status: metav1.ConditionTrue, - Reason: string(apiv1.ClusterReady), - Message: "Cluster is Ready", - } + if modifiedPhase != apiv1.PhaseHealthy && origPhase == apiv1.PhaseHealthy { + contextLogger.Info("Cluster is not healthy") } - - meta.SetStatusCondition(&modifiedCluster.Status.Conditions, condition) - - if !reflect.DeepEqual(origCluster, modifiedCluster) { - modifiedPhase := modifiedCluster.Status.Phase - origPhase := origCluster.Status.Phase - - if modifiedPhase != apiv1.PhaseHealthy && origPhase == apiv1.PhaseHealthy { - contextLogger.Info("Cluster is not healthy") - } - if modifiedPhase == apiv1.PhaseHealthy && origPhase != apiv1.PhaseHealthy { - contextLogger.Info("Cluster is healthy") - } - if err := cli.Status().Patch(ctx, modifiedCluster, client.MergeFrom(origCluster)); err != nil { - return err - } + if modifiedPhase == apiv1.PhaseHealthy && origPhase != apiv1.PhaseHealthy { + contextLogger.Info("Cluster is healthy") } return nil diff --git a/pkg/resources/status/update.go b/pkg/resources/status/update.go new file mode 100644 index 0000000000..0543292d9e --- /dev/null +++ b/pkg/resources/status/update.go @@ -0,0 +1,73 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" +) + +// PatchWithOptimisticLock updates the status of the cluster using the passed +// transaction function. +// Important: after successfully updating the status, this +// function refreshes it into the passed cluster +func PatchWithOptimisticLock( + ctx context.Context, + c client.Client, + cluster *apiv1.Cluster, + tx func(cluster *apiv1.Cluster), +) error { + if cluster == nil { + return nil + } + + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var currentCluster apiv1.Cluster + if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), ¤tCluster); err != nil { + return err + } + + updatedCluster := currentCluster.DeepCopy() + tx(updatedCluster) + + if equality.Semantic.DeepEqual(currentCluster.Status, updatedCluster.Status) { + return nil + } + + if err := c.Status().Patch( + ctx, + updatedCluster, + client.MergeFromWithOptions(¤tCluster, client.MergeFromWithOptimisticLock{}), + ); err != nil { + return err + } + + cluster.Status = updatedCluster.Status + + return nil + }); err != nil { + return fmt.Errorf("while updating conditions: %w", err) + } + + return nil +}