Skip to content

Commit

Permalink
fix: use optimistic locking when updating conditions (cloudnative-pg#…
Browse files Browse the repository at this point in the history
…6328)

Kubernetes does not implement strategic merge for CRDs, and every JSON
merge patch will replace the condition set with a new one.

It is possible for a proposed patch to start from a Cluster that is not
up-to-date, and in that case, the conditions will be reverted back to an
older status.

This patch fixes this race condition by encapsulating this operator with
a merge patch that requires optimistic locking and retrying the
conditions update when needed.

Fixes: cloudnative-pg#6317

Signed-off-by: Leonardo Cecchi <[email protected]>
Signed-off-by: Armando Ruocco <[email protected]>
Signed-off-by: Marco Nenciarini <[email protected]>
Co-authored-by: Armando Ruocco <[email protected]>
Co-authored-by: Marco Nenciarini <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent e22e7e9 commit 67cc547
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 172 deletions.
8 changes: 4 additions & 4 deletions api/v1/cluster_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
var (
// BackupSucceededCondition is added to a backup
// when it was completed correctly
BackupSucceededCondition = &metav1.Condition{
BackupSucceededCondition = metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionTrue,
Reason: string(ConditionReasonLastBackupSucceeded),
Expand All @@ -31,7 +31,7 @@ var (

// BackupStartingCondition is added to a backup
// when it started
BackupStartingCondition = &metav1.Condition{
BackupStartingCondition = metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionFalse,
Reason: string(ConditionBackupStarted),
Expand All @@ -40,8 +40,8 @@ var (

// BuildClusterBackupFailedCondition builds
// ConditionReasonLastBackupFailed condition
BuildClusterBackupFailedCondition = func(err error) *metav1.Condition {
return &metav1.Condition{
BuildClusterBackupFailedCondition = func(err error) metav1.Condition {
return metav1.Condition{
Type: string(ConditionBackup),
Status: metav1.ConditionFalse,
Reason: string(ConditionReasonLastBackupFailed),
Expand Down
30 changes: 25 additions & 5 deletions internal/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ import (
cnpgiClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client"
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/pkg/certs"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres/webserver/client/remote"
"github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/backup/volumesnapshot"
"github.com/cloudnative-pg/cloudnative-pg/pkg/reconciler/persistentvolumeclaim"
resourcestatus "github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
"github.com/cloudnative-pg/cloudnative-pg/pkg/specs"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
)
Expand Down Expand Up @@ -417,7 +417,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
}
}

if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupStartingCondition); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BackupStartingCondition,
); errCond != nil {
contextLogger.Error(errCond, "Error while updating backup condition (backup starting)")
}

Expand All @@ -440,7 +445,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
// and un-fence the Pod
contextLogger.Error(err, "while executing snapshot backup")
// Update backup status in cluster conditions
if errCond := conditions.Patch(ctx, r.Client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BuildClusterBackupFailedCondition(err),
); errCond != nil {
contextLogger.Error(errCond, "Error while updating backup condition (backup snapshot failed)")
}

Expand All @@ -453,7 +463,12 @@ func (r *BackupReconciler) reconcileSnapshotBackup(
return res, nil
}

if err := conditions.Patch(ctx, r.Client, cluster, apiv1.BackupSucceededCondition); err != nil {
if err := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
r.Client,
cluster,
apiv1.BackupSucceededCondition,
); err != nil {
contextLogger.Error(err, "Can't update the cluster with the completed snapshot backup data")
}

Expand Down Expand Up @@ -633,7 +648,12 @@ func startInstanceManagerBackup(
status.CommandError = stdout

// Update backup status in cluster conditions
if errCond := conditions.Patch(ctx, client, cluster, apiv1.BuildClusterBackupFailedCondition(err)); errCond != nil {
if errCond := resourcestatus.PatchConditionsWithOptimisticLock(
ctx,
client,
cluster,
apiv1.BuildClusterBackupFailedCondition(err),
); errCond != nil {
log.FromContext(ctx).Error(errCond, "Error while updating backup condition (backup failed)")
}
return postgres.PatchBackupStatusAndRetry(ctx, client, backup)
Expand Down
49 changes: 0 additions & 49 deletions pkg/conditions/conditions.go

This file was deleted.

19 changes: 0 additions & 19 deletions pkg/conditions/doc.go

This file was deleted.

24 changes: 14 additions & 10 deletions pkg/management/postgres/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cloudnative-pg/machinery/pkg/fileutils"
"github.com/cloudnative-pg/machinery/pkg/log"
pgTime "github.com/cloudnative-pg/machinery/pkg/postgres/time"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -41,9 +42,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"

// this is needed to correctly open the sql connection with the pgx driver
_ "github.com/jackc/pgx/v5/stdlib"
Expand Down Expand Up @@ -187,12 +188,15 @@ func (b *BackupCommand) run(ctx context.Context) {

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
origCluster := b.Cluster.DeepCopy()

meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(err))

b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
return status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(err))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
b.Log.Error(failErr, "while setting cluster condition for failed backup")
// We do not terminate here because it's more important to properly handle
Expand All @@ -210,7 +214,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
b.Log.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
Expand Down Expand Up @@ -256,7 +260,7 @@ func (b *BackupCommand) takeBackup(ctx context.Context) error {

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
b.Log.Error(err, "Can't update the cluster with the completed backup data")
}
Expand Down Expand Up @@ -303,7 +307,7 @@ func (b *BackupCommand) backupMaintenance(ctx context.Context) {
data.GetLastSuccessfulBackupTime(),
)

if reflect.DeepEqual(origCluster.Status, b.Cluster.Status) {
if equality.Semantic.DeepEqual(origCluster.Status, b.Cluster.Status) {
return nil
}
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
Expand Down
1 change: 1 addition & 0 deletions pkg/management/postgres/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var _ = Describe("testing backup command", func() {
Client: fake.NewClientBuilder().
WithScheme(scheme.BuildWithAllKnownScheme()).
WithObjects(cluster, backup).
WithStatusSubresource(cluster, backup).
Build(),
Recorder: &record.FakeRecorder{},
Env: os.Environ(),
Expand Down
15 changes: 10 additions & 5 deletions pkg/management/postgres/webserver/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/internal/management/cache"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/url"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
)

Expand Down Expand Up @@ -245,17 +245,17 @@ type ArchiveStatusRequest struct {
Error string `json:"error,omitempty"`
}

func (asr *ArchiveStatusRequest) getContinuousArchivingCondition() *metav1.Condition {
func (asr *ArchiveStatusRequest) getContinuousArchivingCondition() metav1.Condition {
if asr.Error != "" {
return &metav1.Condition{
return metav1.Condition{
Type: string(apiv1.ConditionContinuousArchiving),
Status: metav1.ConditionFalse,
Reason: string(apiv1.ConditionReasonContinuousArchivingFailing),
Message: asr.Error,
}
}

return &metav1.Condition{
return metav1.Condition{
Type: string(apiv1.ConditionContinuousArchiving),
Status: metav1.ConditionTrue,
Reason: string(apiv1.ConditionReasonContinuousArchivingSuccess),
Expand Down Expand Up @@ -283,7 +283,12 @@ func (ws *localWebserverEndpoints) setWALArchiveStatusCondition(w http.ResponseW
return
}

if errCond := conditions.Patch(ctx, ws.typedClient, cluster, asr.getContinuousArchivingCondition()); errCond != nil {
if errCond := status.PatchConditionsWithOptimisticLock(
ctx,
ws.typedClient,
cluster,
asr.getContinuousArchivingCondition(),
); errCond != nil {
contextLogger.Error(errCond, "Error changing wal archiving condition",
"condition", asr.getContinuousArchivingCondition())
http.Error(
Expand Down
21 changes: 12 additions & 9 deletions pkg/management/postgres/webserver/plugin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
pluginClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client"
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/internal/configuration"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources"
"github.com/cloudnative-pg/cloudnative-pg/pkg/resources/status"
)

// PluginBackupCommand represent a backup command that is being executed
Expand Down Expand Up @@ -102,7 +102,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {

// Update backup status in cluster conditions on startup
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupStartingCondition)
}); err != nil {
contextLogger.Error(err, "Error changing backup condition (backup started)")
// We do not terminate here because we could still have a good backup
Expand Down Expand Up @@ -152,7 +152,7 @@ func (b *PluginBackupCommand) invokeStart(ctx context.Context) {

// Update backup status in cluster conditions on backup completion
if err := b.retryWithRefreshedCluster(ctx, func() error {
return conditions.Patch(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
return status.PatchConditionsWithOptimisticLock(ctx, b.Client, b.Cluster, apiv1.BackupSucceededCondition)
}); err != nil {
contextLogger.Error(err, "Can't update the cluster with the completed backup data")
}
Expand All @@ -176,12 +176,15 @@ func (b *PluginBackupCommand) markBackupAsFailed(ctx context.Context, failure er

// add backup failed condition to the cluster
if failErr := b.retryWithRefreshedCluster(ctx, func() error {
origCluster := b.Cluster.DeepCopy()

meta.SetStatusCondition(&b.Cluster.Status.Conditions, *apiv1.BuildClusterBackupFailedCondition(failure))

b.Cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
return b.Client.Status().Patch(ctx, b.Cluster, client.MergeFrom(origCluster))
return status.PatchWithOptimisticLock(
ctx,
b.Client,
b.Cluster,
func(cluster *apiv1.Cluster) {
meta.SetStatusCondition(&cluster.Status.Conditions, apiv1.BuildClusterBackupFailedCondition(failure))
cluster.Status.LastFailedBackup = pgTime.GetCurrentTimestampWithFormat(time.RFC3339)
},
)
}); failErr != nil {
contextLogger.Error(failErr, "while setting cluster condition for failed backup")
}
Expand Down
Loading

0 comments on commit 67cc547

Please sign in to comment.