Skip to content

Commit

Permalink
fix: use optimistic locking when updating conditions (cloudnative-pg#…
Browse files Browse the repository at this point in the history
…6328)

Kubernetes does not implement strategic merge for CRDs, and every JSON
merge patch will replace the condition set with a new one.

It is possible for a proposed patch to start from a Cluster that is not
up-to-date, and in that case, the conditions will be reverted back to an
older status.

This patch fixes this race condition by encapsulating this operator with
a merge patch that requires optimistic locking and retrying the
conditions update when needed.

Fixes: cloudnative-pg#6317

Signed-off-by: Leonardo Cecchi <[email protected]>
Signed-off-by: Armando Ruocco <[email protected]>
Signed-off-by: Marco Nenciarini <[email protected]>
Co-authored-by: Armando Ruocco <[email protected]>
Co-authored-by: Marco Nenciarini <[email protected]>
(cherry picked from commit 67cc547)
  • Loading branch information
leonardoce authored and mnencia committed Dec 18, 2024
1 parent 9a5e7d0 commit 347512e
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 170 deletions.
8 changes: 4 additions & 4 deletions api/v1/cluster_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var (
// BackupSucceededCondition is added to a backup
// when it was completed correctly
BackupSucceededCondition = &metav1.Condition{
BackupSucceededCondition = metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionTrue,
Reason: string(ConditionReasonLastBackupSucceeded),
Expand All @@ -33,7 +33,7 @@ var (

// BackupStartingCondition is added to a backup
// when it started
BackupStartingCondition = &metav1.Condition{
BackupStartingCondition = metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionFalse,
Reason: string(ConditionBackupStarted),
Expand All @@ -42,8 +42,8 @@ var (

// BuildClusterBackupFailedCondition builds
// ConditionReasonLastBackupFailed condition
BuildClusterBackupFailedCondition = func(err error) *metav1.Condition {
return &metav1.Condition{
BuildClusterBackupFailedCondition = func(err error) metav1.Condition {
return metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionFalse,
Reason: string(ConditionReasonLastBackupFailed),
Expand Down
16 changes: 13 additions & 3 deletions internal/cmd/manager/walarchive/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
cacheClient "github.com/cloudnative-pg/cloudnative-pg/internal/management/cache/client"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres/archiver"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
)

// errSwitchoverInProgress is raised when there is a switchover in progress
Expand Down Expand Up @@ -83,7 +83,12 @@ func NewCmd() *cobra.Command {
Reason: string(apiv1.ConditionReasonContinuousArchivingFailing),
Message: err.Error(),
}
if errCond := conditions.Patch(ctx, typedClient, cluster, &condition); errCond != nil {
if errCond := status.PatchConditionsWithOptimisticLock(
ctx,
typedClient,
cluster,
condition,
); errCond != nil {
contextLog.Error(errCond, "Error changing wal archiving condition (wal archiving failed)")
}
return err
Expand All @@ -96,7 +101,12 @@ func NewCmd() *cobra.Command {
Reason: string(apiv1.ConditionReasonContinuousArchivingSuccess),
Message: "Continuous archiving is working",
}
if errCond := conditions.Patch(ctx, typedClient, cluster, &condition); errCond != nil {
if errCond := status.PatchConditionsWithOptimisticLock(
ctx,
typedClient,
cluster,
condition,
); errCond != nil {
contextLog.Error(errCond, "Error changing wal archiving condition (wal archiving succeeded)")
}

Expand Down
30 changes: 25 additions & 5 deletions internal/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/backup/volumesnapshot"
"github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/persistentvolumeclaim"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/instance"
resourcestatus "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
"github.com/cloudnative-pg/cloudnative-pg/pkg/specs"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
)
Expand Down Expand Up @@ -386,7 +386,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
}
}

if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupStartingCondition); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BackupStartingCondition,
); errCond != nil {
contextLogger.Error(errCond, "Error while updating backup condition (backup starting)")
}

Expand All @@ -409,7 +414,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
// and un-fence the Pod
contextLogger.Error(err, "while executing snapshot backup")
// Update backup status in cluster conditions
if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BuildClusterBackupFailedCondition(err),
); errCond != nil {
contextLogger.Error(errCond, "Error while updating backup condition (backup snapshot failed)")
}

Expand All @@ -422,7 +432,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
return res, nil
}

if err := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupSucceededCondition); err != nil {
if err := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BackupSucceededCondition,
); err != nil {
contextLogger.Error(err, "Can't update the cluster with the completed snapshot backup data")
}

Expand Down Expand Up @@ -602,7 +617,12 @@ func startInstanceManagerBackup(
status.CommandError = stdout

// Update backup status in cluster conditions
if errCond := conditions.Patch(ctx, client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
client,
cluster,
apiv1.BuildClusterBackupFailedCondition(err),
); errCond != nil {
log.FromContext(ctx).Error(errCond, "Error while updating backup condition (backup failed)")
}
return postgres.PatchBackupStatusAndRetry(ctx, client, backup)
Expand Down
49 changes: 0 additions & 49 deletions pkg/conditions/conditions.go

This file was deleted.

19 changes: 0 additions & 19 deletions pkg/conditions/doc.go

This file was deleted.

24 changes: 14 additions & 10 deletions pkg/management/postgres/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cloudnative-pg/machinery/pkg/fileutils"
"github.com/cloudnative-pg/machinery/pkg/log"
pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -41,9 +42,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"

// this is needed to correctly open the sql connection with the pgx driver
_ "github.com/jackc/pgx/v5/stdlib"
Expand Down Expand Up @@ -187,12 +188,15 @@ func (b *BackupCommand) run(ctx context.Context) {

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
origCluster := b.Cluster.DeepCopy()

meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(err))

b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
return status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
b.Log.Error(failErr, "while setting cluster condition for failed backup")
// We do not terminate here because it's more important to properly handle
Expand All @@ -210,7 +214,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
b.Log.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
Expand Down Expand Up @@ -256,7 +260,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
b.Log.Error(err, "Can't update the cluster with the completed backup data")
}
Expand Down Expand Up @@ -303,7 +307,7 @@ func (b *BackupCommand) backupMaintenance(ctx context.Context) {
data.GetLastSuccessfulBackupTime(),
)

if reflect.DeepEqual(origCluster.Status, b.Cluster.Status) {
if equality.Semantic.DeepEqual(origCluster.Status, b.Cluster.Status) {
return nil
}
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
Expand Down
1 change: 1 addition & 0 deletions pkg/management/postgres/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var _ = Describe("testing backup command", func() {
Client: fake.NewClientBuilder().
WithScheme(scheme.BuildWithAllKnownScheme()).
WithObjects(cluster, backup).
WithStatusSubresource(cluster, backup).
Build(),
Recorder: &record.FakeRecorder{},
Env: os.Environ(),
Expand Down
21 changes: 12 additions & 9 deletions pkg/management/postgres/webserver/plugin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
)

// PluginBackupCommand represent a backup command that is being executed
Expand Down Expand Up @@ -83,7 +83,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
contextLogger.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
Expand Down Expand Up @@ -132,7 +132,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
contextLogger.Error(err, "Can't update the cluster with the completed backup data")
}
Expand All @@ -156,12 +156,15 @@ func (b *PluginBackupCommand) markBackupAsFailed(ctx context.Context, failure er

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
origCluster := b.Cluster.DeepCopy()

meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(failure))

b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
return status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
contextLogger.Error(failErr, "while setting cluster condition for failed backup")
}
Expand Down
Loading

0 comments on commit 347512e

Please sign in to comment.