Skip to content

Commit

Permalink
chore: executor changes
Browse files Browse the repository at this point in the history
Signed-off-by: Armando Ruocco <[email protected]>
  • Loading branch information
armru committed Feb 12, 2024
1 parent af08c05 commit ee8ff14
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 52 deletions.
13 changes: 7 additions & 6 deletions internal/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func (Implementation) Backup(
ctx context.Context,
request *backup.BackupRequest,
) (*backup.BackupResult, error) {
logging := logging.FromContext(ctx)
contextLogger := logging.FromContext(ctx)

helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition)
if err != nil {
logging.Error(err, "Error while decoding cluster definition from CNPG")
contextLogger.Error(err, "Error while decoding cluster definition from CNPG")
return nil, err
}

backupObject, err := helper.DecodeBackup(request.BackupDefinition)
if err != nil {
logging.Error(err, "Error while decoding backup definition from CNPG")
contextLogger.Error(err, "Error while decoding backup definition from CNPG")
return nil, err
}

Expand All @@ -68,20 +68,21 @@ func (Implementation) Backup(
helper.GetCluster(),
backupObject,
repository,
podIP,
)

startedAt := time.Now()
logging.Info("Preparing physical backup")
contextLogger.Info("Preparing physical backup")
if err := executor.Start(ctx); err != nil {
return nil, err
}

logging.Info("Copying files")
contextLogger.Info("Copying files")
if err := executor.Backup(ctx); err != nil {
return nil, err
}

logging.Info("Finishing backup")
contextLogger.Info("Finishing backup")
backupInfo, err := executor.Stop(ctx)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions internal/backup/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package backup

const podIP = "127.0.0.1"
10 changes: 6 additions & 4 deletions internal/backup/controldata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ func getPgControlData(
) (map[string]string, error) {
contextLogger := logging.FromContext(ctx)

const connectionTimeout = 2 * time.Second
const requestTimeout = 30 * time.Second
const (
connectionTimeout = 2 * time.Second
requestTimeout = 30 * time.Second
)

// We want a connection timeout to prevent waiting for the default
// TCP connection timeout (30 seconds) on lost SYN packets
Expand All @@ -36,7 +38,7 @@ func getPgControlData(
}

httpURL := url.Build(podIP, url.PathPGControlData, url.StatusPort)
req, err := http.NewRequestWithContext(ctx, "GET", httpURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, httpURL, nil)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +59,7 @@ func getPgControlData(
return nil, err
}

if resp.StatusCode != 200 {
if resp.StatusCode != http.StatusOK {
contextLogger.Info("Error while querying the pg_controldata endpoint",
"statusCode", resp.StatusCode,
"body", string(body))
Expand Down
98 changes: 56 additions & 42 deletions internal/backup/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,9 @@ import (
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging"
)

const (
podIP = "127.0.0.1"

snapshotTypeName = "type"
snapshotTypeBase = "base"
snapshotTypeTablespace = "tablespace"

snapshotTablespaceOidName = "oid"

currentWALFileControlFile = "Latest checkpoint's REDO WAL file"
var (
errBackupNotStarted = fmt.Errorf("backup not started")
errBackupNotStopped = fmt.Errorf("backup not stopped")
)

var backupModeBackoff = wait.Backoff{
Expand All @@ -42,9 +35,10 @@ type Executor struct {
beginWal string
endWal string

cluster *apiv1.Cluster
backup *apiv1.Backup
repository *Repository
cluster *apiv1.Cluster
backup *apiv1.Backup
repository *Repository
backupClientEndpoint string
}

// Tablespace represent a tablespace location
Expand All @@ -57,42 +51,53 @@ type Tablespace struct {
}

// NewExecutor creates a new backup executor
func NewExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository) *Executor {
func NewExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository, endpoint string) *Executor {
return &Executor{
backupClient: webserver.NewBackupClient(),
cluster: cluster,
backup: backup,
repository: repo,
backupClient: webserver.NewBackupClient(),
cluster: cluster,
backup: backup,
repository: repo,
backupClientEndpoint: endpoint,
}
}

// Start starts a backup by setting PostgreSQL in backup mode
func (executor *Executor) Start(ctx context.Context) error {
logger := logging.FromContext(ctx)
errBackupNotStarted := fmt.Errorf("backup not started")

var err error
executor.beginWal, err = executor.getCurrentWALFile(ctx)
if err != nil {
return err
var currentWALErr error
executor.beginWal, currentWALErr = executor.getCurrentWALFile(ctx)
if currentWALErr != nil {
return currentWALErr
}

err = executor.backupClient.Start(ctx, podIP, webserver.StartBackupRequest{
if err := executor.backupClient.Start(ctx, executor.backupClientEndpoint, webserver.StartBackupRequest{
ImmediateCheckpoint: true,
WaitForArchive: true,
BackupName: executor.backup.GetName(),
Force: true,
})
if err != nil {
}); err != nil {
logger.Error(err, "while requesting new backup on PostgreSQL")
return err
}

logger.Info("Requesting PostgreSQL Backup mode")
err = retry.OnError(backupModeBackoff, func(e error) bool {
return e == errBackupNotStarted
}, func() error {
response, err := executor.backupClient.StatusWithErrors(ctx, podIP)
logger.Info("Requesting PostgreSQL Backup status")
if err := retry.OnError(
backupModeBackoff,
retryOnBackupNotStarted,
executor.ensureBackupStartedFunc(ctx)); err != nil {
return err
}

logger.Info("Backup Mode started")
return nil
}

func (executor *Executor) ensureBackupStartedFunc(ctx context.Context) func() error {
logger := logging.FromContext(ctx)

return func() error {
response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint)
if err != nil {
return err
}
Expand All @@ -103,17 +108,23 @@ func (executor *Executor) Start(ctx context.Context) error {
}

return nil
})
if err != nil {
return err
}
}

logger.Info("Backup Mode started")
return nil
func retryOnBackupNotStarted(err error) bool {
return err == errBackupNotStarted
}

// Backup takes the snapshot of the data directory and the tablespace folder
func (executor *Executor) Backup(ctx context.Context) error {
const snapshotTablespaceOidName = "oid"

const (
snapshotTypeName = "type"
snapshotTypeBase = "base"
snapshotTypeTablespace = "tablespace"
)

logger := logging.FromContext(ctx)

tablespaces, err := executor.getTablespaces(ctx)
Expand Down Expand Up @@ -175,9 +186,8 @@ func (*Executor) getTablespaces(ctx context.Context) ([]Tablespace, error) {
// Stop stops a backup and resume PostgreSQL normal operation
func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData, error) {
logger := logging.FromContext(ctx)
errBackupNotStopped := fmt.Errorf("backup not stopped")

err := executor.backupClient.Stop(ctx, podIP, webserver.StopBackupRequest{
err := executor.backupClient.Stop(ctx, executor.backupClientEndpoint, webserver.StopBackupRequest{
BackupName: executor.backup.GetName(),
})
if err != nil {
Expand All @@ -187,10 +197,8 @@ func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData

logger.Info("Stopping PostgreSQL Backup mode")
var backupStatus webserver.BackupResultData
err = retry.OnError(backupModeBackoff, func(e error) bool {
return e == errBackupNotStopped
}, func() error {
response, err := executor.backupClient.StatusWithErrors(ctx, podIP)
err = retry.OnError(backupModeBackoff, retryOnBackupNotStopped, func() error {
response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint)
if err != nil {
return err
}
Expand All @@ -217,7 +225,13 @@ func (executor *Executor) Stop(ctx context.Context) (*webserver.BackupResultData
return &backupStatus, err
}

func retryOnBackupNotStopped(e error) bool {
return e == errBackupNotStopped
}

func (executor *Executor) getCurrentWALFile(ctx context.Context) (string, error) {
const currentWALFileControlFile = "Latest checkpoint's REDO WAL file"

controlDataOutput, err := getPgControlData(ctx)
if err != nil {
return "", err
Expand Down

0 comments on commit ee8ff14

Please sign in to comment.