Skip to content

Commit

Permalink
refactor: remove b.retryWithRefreshedCluster
Browse files Browse the repository at this point in the history
Signed-off-by: Armando Ruocco <[email protected]>
  • Loading branch information
armru committed Dec 18, 2024
1 parent b098e6a commit 5947b43
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 199 deletions.
61 changes: 24 additions & 37 deletions pkg/management/postgres/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cloudnative-pg/machinery/pkg/fileutils"
"github.com/cloudnative-pg/machinery/pkg/log"
pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -151,13 +150,6 @@ func (b *BackupCommand) ensureCompatibility() error {
return b.barmanBackup.IsCompatible(postgresVers)
}

func (b *BackupCommand) retryWithRefreshedCluster(
ctx context.Context,
cb func() error,
) error {
return resources.RetryWithRefreshedResource(ctx, b.Client, b.Cluster, cb)
}

// run executes the barman-cloud-backup command and updates the status
// This method will take long time and is supposed to run inside a dedicated
// goroutine.
Expand Down Expand Up @@ -187,18 +179,16 @@ func (b *BackupCommand) run(ctx context.Context) {
}

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
return status.UpdateAndRefresh(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
b.Log.Error(failErr, "while setting cluster condition for failed backup")
if err := status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
); err != nil {
b.Log.Error(err, "while setting cluster condition for failed backup")
// We do not terminate here because it's more important to properly handle
// the backup maintenance activity than putting a condition in the cluster
}
Expand All @@ -213,9 +203,12 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {
b.Recorder.Event(b.Backup, "Normal", "Starting", "Backup started")

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
if err := status.PatchConditionsWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
apiv1.BackupStartingCondition,
); err != nil {
b.Log.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
// even if we are unable to communicate with the Kubernetes API server
Expand Down Expand Up @@ -259,9 +252,12 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {
}

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
if err := status.PatchConditionsWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
apiv1.BackupSucceededCondition,
); err != nil {
b.Log.Error(err, "Can't update the cluster with the completed backup data")
}

Expand Down Expand Up @@ -297,21 +293,12 @@ func (b *BackupCommand) backupMaintenance(ctx context.Context) {
b.Log.Error(err, "while deleting Backups not present in the catalog")
}

if err := b.retryWithRefreshedCluster(ctx, func() error {
origCluster := b.Cluster.DeepCopy()

// Set the first recoverability point and the last successful backup
b.Cluster.UpdateBackupTimes(
if err := status.PatchWithOptimisticLock(ctx, b.Client, b.Cluster, func(cluster *apiv1.Cluster) {
cluster.UpdateBackupTimes(
apiv1.BackupMethod(data.GetBackupMethod()),
data.GetFirstRecoverabilityPoint(),
data.GetLastSuccessfulBackupTime(),
)

// TODO(leonardoce) questo deep equal fa venire la nausea
if equality.Semantic.DeepEqual(origCluster.Status, b.Cluster.Status) {
return nil
}
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
}); err != nil {
b.Log.Error(err, "while setting the firstRecoverabilityPoint and latestSuccessfulBackup")
}
Expand Down
46 changes: 21 additions & 25 deletions pkg/management/postgres/webserver/plugin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/internal/configuration"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
)

Expand Down Expand Up @@ -101,9 +100,12 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {
b.Recorder.Event(b.Backup, "Normal", "Starting", "Backup started")

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
if err := status.PatchConditionsWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
apiv1.BackupStartingCondition,
); err != nil {
contextLogger.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
// even if we are unable to communicate with the Kubernetes API server
Expand Down Expand Up @@ -151,9 +153,12 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {
}

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
if err := status.PatchConditionsWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
apiv1.BackupSucceededCondition,
); err != nil {
contextLogger.Error(err, "Can't update the cluster with the completed backup data")
}
}
Expand All @@ -175,24 +180,15 @@ func (b *PluginBackupCommand) markBackupAsFailed(ctx context.Context, failure er
}

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
return status.UpdateAndRefresh(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
if failErr := status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
); failErr != nil {
contextLogger.Error(failErr, "while setting cluster condition for failed backup")
}
}

func (b *PluginBackupCommand) retryWithRefreshedCluster(
ctx context.Context,
cb func() error,
) error {
return resources.RetryWithRefreshedResource(ctx, b.Client, b.Cluster, cb)
}
4 changes: 2 additions & 2 deletions pkg/reconciler/replicaclusterswitch/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func startTransition(ctx context.Context, cli client.Client, cluster *apiv1.Clus
return nil, fmt.Errorf("while fencing primary cluster to demote it: %w", err)
}

if err := status.UpdateAndRefresh(
if err := status.PatchWithOptimisticLock(
ctx,
cli,
cluster,
Expand Down Expand Up @@ -139,7 +139,7 @@ func cleanupTransitionMetadata(ctx context.Context, cli client.Client, cluster *
}
}

return status.UpdateAndRefresh(
return status.PatchWithOptimisticLock(
ctx,
cli,
cluster,
Expand Down
23 changes: 0 additions & 23 deletions pkg/resources/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,5 @@ limitations under the License.

package resources

import (
"context"

"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// RetryAlways is a function that always returns true on any error encountered
func RetryAlways(_ error) bool { return true }

// RetryWithRefreshedResource updates the resource before invoking the cb
func RetryWithRefreshedResource(
ctx context.Context,
cli client.Client,
resource client.Object,
cb func() error,
) error {
return retry.OnError(retry.DefaultBackoff, RetryAlways, func() error {
if err := cli.Get(ctx, client.ObjectKeyFromObject(resource), resource); err != nil {
return err
}

return cb()
})
}
93 changes: 0 additions & 93 deletions pkg/resources/retry_test.go

This file was deleted.

26 changes: 16 additions & 10 deletions pkg/resources/status/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func PatchConditionsWithOptimisticLock(
return nil
}

applyConditions := func(cluster *apiv1.Cluster) bool {
changed := false
for _, c := range conditions {
changed = changed || meta.SetStatusCondition(&cluster.Status.Conditions, c)
}
return changed
}

var currentCluster apiv1.Cluster
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), &currentCluster); err != nil {
Expand All @@ -70,12 +62,26 @@ func PatchConditionsWithOptimisticLock(
return err
}

updatedCluster.DeepCopyInto(cluster)

return nil
}); err != nil {
return fmt.Errorf("while updating conditions: %w", err)
}

if err := c.Get(ctx, client.ObjectKeyFromObject(cluster), &currentCluster); err != nil {
return err
}

cluster.Status.Conditions = currentCluster.Status.Conditions

return nil
}

// applyConditions will apply the passed conditions to the cluster status.
// returns true if there was a change to the existing object status, false otherwise.
func applyConditions(cluster *apiv1.Cluster, conditions ...metav1.Condition) bool {
changed := false
for _, c := range conditions {
changed = changed || meta.SetStatusCondition(&cluster.Status.Conditions, c)
}
return changed
}
Loading

0 comments on commit 5947b43

Please sign in to comment.