From ee8ff1442eaea7d80dce45f48b9c461ab260ea65 Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Mon, 12 Feb 2024 15:27:17 +0100 Subject: [PATCH] chore: executor changes Signed-off-by: Armando Ruocco --- internal/backup/backup.go | 13 ++--- internal/backup/constants.go | 3 ++ internal/backup/controldata.go | 10 ++-- internal/backup/executor.go | 98 +++++++++++++++++++--------------- 4 files changed, 72 insertions(+), 52 deletions(-) create mode 100644 internal/backup/constants.go diff --git a/internal/backup/backup.go b/internal/backup/backup.go index 8bb4831..d87a5a4 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -40,17 +40,17 @@ func (Implementation) Backup( ctx context.Context, request *backup.BackupRequest, ) (*backup.BackupResult, error) { - logging := logging.FromContext(ctx) + contextLogger := logging.FromContext(ctx) helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) if err != nil { - logging.Error(err, "Error while decoding cluster definition from CNPG") + contextLogger.Error(err, "Error while decoding cluster definition from CNPG") return nil, err } backupObject, err := helper.DecodeBackup(request.BackupDefinition) if err != nil { - logging.Error(err, "Error while decoding backup definition from CNPG") + contextLogger.Error(err, "Error while decoding backup definition from CNPG") return nil, err } @@ -68,20 +68,21 @@ func (Implementation) Backup( helper.GetCluster(), backupObject, repository, + podIP, ) startedAt := time.Now() - logging.Info("Preparing physical backup") + contextLogger.Info("Preparing physical backup") if err := executor.Start(ctx); err != nil { return nil, err } - logging.Info("Copying files") + contextLogger.Info("Copying files") if err := executor.Backup(ctx); err != nil { return nil, err } - logging.Info("Finishing backup") + contextLogger.Info("Finishing backup") backupInfo, err := executor.Stop(ctx) if err != nil { return nil, err diff --git a/internal/backup/constants.go b/internal/backup/constants.go new file mode 100644 index 0000000..8403e82 --- /dev/null +++ b/internal/backup/constants.go @@ -0,0 +1,3 @@ +package backup + +const podIP = "127.0.0.1" diff --git a/internal/backup/controldata.go b/internal/backup/controldata.go index c550ca4..bf47388 100644 --- a/internal/backup/controldata.go +++ b/internal/backup/controldata.go @@ -21,8 +21,10 @@ func getPgControlData( ) (map[string]string, error) { contextLogger := logging.FromContext(ctx) - const connectionTimeout = 2 * time.Second - const requestTimeout = 30 * time.Second + const ( + connectionTimeout = 2 * time.Second + requestTimeout = 30 * time.Second + ) // We want a connection timeout to prevent waiting for the default // TCP connection timeout (30 seconds) on lost SYN packets @@ -36,7 +38,7 @@ func getPgControlData( } httpURL := url.Build(podIP, url.PathPGControlData, url.StatusPort) - req, err := http.NewRequestWithContext(ctx, "GET", httpURL, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, httpURL, nil) if err != nil { return nil, err } @@ -57,7 +59,7 @@ func getPgControlData( return nil, err } - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { contextLogger.Info("Error while querying the pg_controldata endpoint", "statusCode", resp.StatusCode, "body", string(body)) diff --git a/internal/backup/executor.go b/internal/backup/executor.go index 6d31284..a40439e 100644 --- a/internal/backup/executor.go +++ b/internal/backup/executor.go @@ -16,16 +16,9 @@ import ( "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" ) -const ( - podIP = "127.0.0.1" - - snapshotTypeName = "type" - snapshotTypeBase = "base" - snapshotTypeTablespace = "tablespace" - - snapshotTablespaceOidName = "oid" - - currentWALFileControlFile = "Latest checkpoint's REDO WAL file" +var ( + errBackupNotStarted = fmt.Errorf("backup not started") + errBackupNotStopped = fmt.Errorf("backup not stopped") ) var backupModeBackoff = wait.Backoff{ @@ -42,9 +35,10 @@ type Executor struct { beginWal string endWal string - cluster *apiv1.Cluster - backup *apiv1.Backup - repository *Repository + cluster *apiv1.Cluster + backup *apiv1.Backup + repository *Repository + backupClientEndpoint string } // Tablespace represent a tablespace location @@ -57,42 +51,53 @@ type Tablespace struct { } // NewExecutor creates a new backup executor -func NewExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository) *Executor { +func NewExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository, endpoint string) *Executor { return &Executor{ - backupClient: webserver.NewBackupClient(), - cluster: cluster, - backup: backup, - repository: repo, + backupClient: webserver.NewBackupClient(), + cluster: cluster, + backup: backup, + repository: repo, + backupClientEndpoint: endpoint, } } // Start starts a backup by setting PostgreSQL in backup mode func (executor *Executor) Start(ctx context.Context) error { logger := logging.FromContext(ctx) - errBackupNotStarted := fmt.Errorf("backup not started") - var err error - executor.beginWal, err = executor.getCurrentWALFile(ctx) - if err != nil { - return err + var currentWALErr error + executor.beginWal, currentWALErr = executor.getCurrentWALFile(ctx) + if currentWALErr != nil { + return currentWALErr } - err = executor.backupClient.Start(ctx, podIP, webserver.StartBackupRequest{ + if err := executor.backupClient.Start(ctx, executor.backupClientEndpoint, webserver.StartBackupRequest{ ImmediateCheckpoint: true, WaitForArchive: true, BackupName: executor.backup.GetName(), Force: true, - }) - if err != nil { + }); err != nil { logger.Error(err, "while requesting new backup on PostgreSQL") return err } - logger.Info("Requesting PostgreSQL Backup mode") - err = retry.OnError(backupModeBackoff, func(e error) bool { - return e == errBackupNotStarted - }, func() error { - response, err := executor.backupClient.StatusWithErrors(ctx, podIP) + logger.Info("Requesting PostgreSQL Backup status") + if err := retry.OnError( + backupModeBackoff, + retryOnBackupNotStarted, + executor.ensureBackupStartedFunc(ctx)); err != nil { + return err + } + + logger.Info("Backup Mode started") + return nil +} + +func (executor *Executor) ensureBackupStartedFunc(ctx context.Context) func() error { + logger := logging.FromContext(ctx) + + return func() error { + response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint) if err != nil { return err } @@ -103,17 +108,23 @@ func (executor *Executor) Start(ctx context.Context) error { } return nil - }) - if err != nil { - return err } +} - logger.Info("Backup Mode started") - return nil +func retryOnBackupNotStarted(err error) bool { + return err == errBackupNotStarted } // Backup takes the snapshot of the data directory and the tablespace folder func (executor *Executor) Backup(ctx context.Context) error { + const snapshotTablespaceOidName = "oid" + + const ( + snapshotTypeName = "type" + snapshotTypeBase = "base" + snapshotTypeTablespace = "tablespace" + ) + logger := logging.FromContext(ctx) tablespaces, err := executor.getTablespaces(ctx) @@ -175,9 +186,8 @@ func (*Executor) getTablespaces(ctx context.Context) ([]Tablespace, error) { // Stop stops a backup and resume PostgreSQL normal operation func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData, error) { logger := logging.FromContext(ctx) - errBackupNotStopped := fmt.Errorf("backup not stopped") - err := executor.backupClient.Stop(ctx, podIP, webserver.StopBackupRequest{ + err := executor.backupClient.Stop(ctx, executor.backupClientEndpoint, webserver.StopBackupRequest{ BackupName: executor.backup.GetName(), }) if err != nil { @@ -187,10 +197,8 @@ func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData logger.Info("Stopping PostgreSQL Backup mode") var backupStatus webserver.BackupResultData - err = retry.OnError(backupModeBackoff, func(e error) bool { - return e == errBackupNotStopped - }, func() error { - response, err := executor.backupClient.StatusWithErrors(ctx, podIP) + err = retry.OnError(backupModeBackoff, retryOnBackupNotStopped, func() error { + response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint) if err != nil { return err } @@ -217,7 +225,13 @@ func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData return &backupStatus, err } +func retryOnBackupNotStopped(e error) bool { + return e == errBackupNotStopped +} + func (executor *Executor) getCurrentWALFile(ctx context.Context) (string, error) { + const currentWALFileControlFile = "Latest checkpoint's REDO WAL file" + controlDataOutput, err := getPgControlData(ctx) if err != nil { return "", err